Custom operator to run Python module


#1

Hi there,

Just in the process of creating an operator plugin that can run external Python with array data from SciDB :wink: I am using Boost::Python as well to help loading the Python module.

The operator is taking 3 arguments:

  • the input array name
  • the Python script location
  • the Python module (function) name

To make it simple I only pass one SciDB array to Python script, and return one Python list back.

LogicalOperator has been subclassed to return the same ArrayDesc as the input array, as before the execution the returned array dimensions are unknown.
PhysicalOperator has been subclassed to carry out Python environment setup, convert SciDB array to Python list, call the function with the list as argument and convert the result back to SciDB array.

LogicalPython(const std::string& logicalName, const std::string& alias):
        LogicalOperator(logicalName, alias)
    {
        // Expecting one array as input
        ADD_PARAM_IN_ARRAY_NAME()

        // Expecting the location of Python script
        ADD_PARAM_CONSTANT("string")

        // Expecting the function name in the script
        ADD_PARAM_CONSTANT("string")
    }

So in short what the plugin does are:

  1. fetch the array data from SciDB and turn it to a Python list structure in C
  2. load the specific Python script (path and module name as arguments) and pass the Python list into it
  3. get the result back and convert it from Python list to SciDB array (MemArray for now)

All tested out and looking good - until I want to save the data into SciDB via “store”, it complains about the different things such as chunk already existed or chunk boundaries.

Then I realised the LogicalOperator cannot simply return the same ArrayDesc as input array, it must know the result dimensions which is unknown until PhysicalOperator::execute() is called.
After a bit of restructure I execute the Python script in LogicalOperator to solve this chicken and egg problem…

But now I can’t read the input array to pass it to the Python script - of course in PhysicalOperator it is provided as part of the arguments but not in LogicalOperator.

So I did something like the following in the subclass LogicalOperator::inferSchema():

// Get the input array
const string arrayName = ((boost::shared_ptr<OperatorParamReference>&)_parameters[0])
                                    ->getObjectName();
boost::shared_ptr<Array> inputArray = query->getArray(arrayName);
assert(inputArray); // this is alright

// Copy data from SciDB array to Python list
boost::shared_ptr<ConstItemIterator> inputItr = inputArray->getItemIterator(attrId);
const Dimensions& dims = inputArray->getArrayDesc().getDimensions();
buffer = boost::python::list();

// 1 dimension source array
if (dims.size() == 1) { 
    while (!inputItr->end()) {  // inputItr always point to the end, and inputItr->setPosition() always return false
        buffer.append( ValueToDouble(TID_DOUBLE, inputItr->getItem()) );
        ++(*inputItr);
    }
}
...

However inputItr always point to the end, and inputItr->setPosition() always return false.

I even tried reimplement the LogicalOperator::inferArrayAccess() method to request lock for the input array, but that doesn’t help - I guess the lock will only be acquired just before PhysicalOperator::execute() is called.

Are there any tricks that I can use to access the array data in advance in the LogicalOperator::inferSchema()?

Many thanks,
Patrick


#2

Hey Patrick.

I think query->getArray() is giving you an empty array because you’re passing in an unqualified name. I believe you need to pass in a versioned name, i.e. not “array_name” but “array_name@version_number”. For an experiment, try to create some array “foo”, then make sure you populate it with some data, then in the operator say:

boost::shared_ptr<Array> inputArray = query->getArray("foo@1");

I think that should make things happy. If it succeeds, look at the SystemCatalog class. It has facilities to get the latest version for an array so you can construct this string correctly.
Looks like you’re doing some exciting things! Please keep us posted.


#3

If you like, there is also a higher-level alternative to work around this problem.

Suppose you can make your output dense and one-dimensional. It looks like this:
[i=0:*,500000,0]

500K is a decent chunk size for dense 1D arrays. Suppose we can make it so that the coordinate “i” is not important. All the important position data can become (extra) arguments; i can simply be a dumb counter. Then you can write this array out, from every instance simultaneously, provided you manipulate the coordinates in the “round robin” fashion. That is, suppose you have 3 instances.

Instance 0 will first write the chunk {0}, then {1500000} then {3000000}
Instance 1 will first write the chunk {500000}, then {2000000} then {3500000}
Instance 2 will first write the chunk {1000000}, then {2500000} then {4000000}

And of course it doesn’t hurt if the last chunk on each instance is partially filled. As we said, i is meaningless. After this DBArray is written and stored, you can redimension it into whatever you like.

I had a problem where I needed to write out a lot of output as fast as possible and did this. This is a simplified, abriged version of what the loop looks like:

            while (_tuplesInserted < elementsToInsert)
            {
                vector<shared_ptr<ArrayIterator> > outputArrayIters(nAttrs);
                vector<shared_ptr<ChunkIterator> > outputChunkIters(nAttrs);
                Coordinates chunkPosition(1,_tuplesInserted * _numInstances + _instanceId * _settings.getOutputChunkSize());
                for (AttributeID i = 0; i < nAttrs; ++i)
                {
                    outputArrayIters[i] = dstArray->getIterator(i);
                    Chunk& newChunk = outputArrayIters[i]->newChunk(chunkPosition);
                    outputChunkIters[i] = newChunk.getIterator(q, ChunkIterator::NO_EMPTY_CHECK | ChunkIterator::SEQUENTIAL_WRITE);;
                }
                Value v;
                Value boolTrue;
                boolTrue.setBool(true);
                for (Coordinates cellPos = chunkPosition;
                     cellPos[0] < chunkPosition[0] + (Coordinate) _settings.getOutputChunkSize() && _tuplesInserted < elementsToInsert;
                     ++cellPos[0])
                {
                    Tuple const& tuple = tuples[_tuplesInserted];
                    for (size_t i=0, n = tuple.size(); i<n; ++i)
                    {
                        outputChunkIters[i]->setPosition(cellPos);
                        outputChunkIters[i]->writeItem(tuple[i]);
                    }
                    outputChunkIters[nAttrs-1]->setPosition(cellPos);
                    outputChunkIters[nAttrs-1]->writeItem(boolTrue);
                    ++_tuplesInserted;
                    _maxPos = cellPos[0];
                }
                for (AttributeID i = 0; i < nAttrs; ++i)
                {
                    outputChunkIters[i]->flush();
                }
            }

#4

Hi apoliakov,

Thanks for the prompt reply - it really works! :smiley: just another tiny little bug to make the whole program stopped working…

Interesting way to write the data out in parallel as I read somewhere in the code that I can only construct one chunk iterator at a time to read from/write into the array. But certainly will try the suggested way to further optimise the plugin :wink:

Now I am extending the ability to pass 2-dimensions arrays to and from the Python module. Copying items from array to Python list is not too hard as I can use ConstItemIterator of the array combined with setPosition() to get the item one by one though it may not be the most efficient way.

Copying data from Python list back to SciDB is the fun part, as I can choose the chunk size for the 2D array independently of the input array. Let i and j be the dimension names for the 2D array, to keep the implementation simple at first I choose chunk size for i to be 1, and j to be the length of the 2nd dimension, so that I can use the Chunk::allocateAndCopy() function easily:

namespace py = boost::python;
// Data in Python list
const py::list& buffer = ...

// Get the dimension of the result list
const vector<uint64_t>& bufferDims = getDimensionsFromPyList( buffer );

    // Now we know the dimension of the result, need to get a new schema for the
    // result array constructor
    Dimensions newDimensions(bufferDims.size());
    if (bufferDims.size() == 1) {
      ...
    }
    else {
        newDimensions[0] = DimensionDesc("i",                             // name
                                         Coordinate(0),                   // start
                                         Coordinate(0),                   // curStart
                                         Coordinate(bufferDims.at(0)-1),  // end
                                         Coordinate(bufferDims.at(0)-1),  // curEnd
                                         1,                               // chunk size
                                         0);                              // overlap
        
        newDimensions[1] = DimensionDesc("j",                             // name
                                         Coordinate(0),                   // start
                                         Coordinate(0),                   // curStart
                                         Coordinate(bufferDims.at(1)-1),  // end
                                         Coordinate(bufferDims.at(1)-1),  // curEnd
                                         bufferDims.at(1),                // chunk size
                                         0);                              // overlap
    }

    // New schema
    Attributes newAttributes(1);
    newAttributes[0] = inputSchema.getAttributes()[0];
    const ArrayDesc resultSchema(inputSchema.getName() + "_python_output",
                                 newAttributes,
                                 newDimensions);

    // Array constructed from new schema
    boost::shared_ptr<Array> resultArray (new MemArray( resultSchema ));

    AttributeDesc const& dstAttr = resultSchema.getAttributes()[0];

  if (bufferDims.size() == 1) {
     ...
  }
   else if (bufferDims.size() == 2) {
        uint64_t i = 0; // index to item in list buffer, 1st dimension
        uint64_t j = 0; // index to item in list buffer, 2nd dimension
        uint64_t t = 0; // index to item in temporary array for copying

        const uint64_t numOfChunks  = bufferDims.at(0);
        DimensionDesc const& dstDim = resultSchema.getDimensions()[1];
        const uint64_t dstLen       = dstDim.getLength();
        const uint64_t dstChunkSize = dstDim.getChunkInterval();
        boost::shared_ptr<ArrayIterator> resultItr = resultArray->getIterator(dstAttr.getId());
        boost::shared_array<double> tmpBuffer (new double[dstChunkSize]);

            for (i = 0; i < numOfChunks; ++i) {

                const py::list& subBuffer = py::extract<py::list>( buffer[i] );

                // Update existing chunk or create new chunk
                Coordinates pos;
                pos.push_back(i);
                pos.push_back(0);
                Chunk *outChunk = (resultItr->setPosition(pos)) ? &resultItr->updateChunk()
                                                                : &resultItr->newChunk(pos);

                // Copy Python list data into tmp buffer
                for (j=0, t=0; (t < dstChunkSize) && (j < dstLen); ++j, ++t) {
                    tmpBuffer[t] = py::extract<double>( subBuffer[j] );
                }
                // Zero out the rest
                for ( ; t < dstChunkSize; ++t ) {
                    tmpBuffer[t] = 0;
                }
                outChunk->allocateAndCopy((const char *)(tmpBuffer.get()),
                                          sizeof(double)*dstChunkSize, // byteSize
                                          false, // isSparse
                                          false, // isRLE
                                          dstChunkSize, // count
                                          query);
            }
      }

This method works however it is slower than copy data from array to Python list - I guess it is due to my choice of chunk size 1 for the first dimension. If I increase the chunk size for first dimension to let say n, then I should allocate the chunk block to be (n x chunk size of j), and the code will need to be smart to copy data for positions within that chunk block only from the Python list of list in order to make allocateAndCopy() works, but jumping around different list within list in Python is not efficient either…

If I treat the output as one dimension array with size (i x j), I wonder if I can just copy all data out from Python list (of list) as a one big chunk as you suggested, and the redimension it later? Can I redimension it within my plugin without having to use iquery to do it later (best to be transparent to the user)? How big can I allocate for the chunk size?

Or if there is a much preferred way…?

Many thanks again,
Patrick


#5

Hi Patrick,

Sorry for the delay in response.

  1. You are using allocateAndCopy with a plain shared_array of doubles. This (non-RLE) way used to work but is becoming deprecated. If you do this you may already encounter errors with storing the output. If you must use allocateAndCopy(), then the object you pass in must be a valid RLEPayload.

  2. The recommended way to fill a chunk is to use chunkIterator with the option SEQUENTIAL_WRITE set. That essentially creates an RLEPayload for you inside the chunk. But if you use SEQUENTIAL_WRITE, you must fill the chunk in stride-major order. So you must write values with last-coordinate incremented first, as in:
    {0,0}
    {0,1}
    {0,2}

    {0,10}

    {1,0}
    {1,1}
    {1,2}

You don’t have to use SEQUENTIAL_WRITE. If you don’t use it then the ChunkIterator will use an rb-tree to fill the data, and you can populate the tree in any order you want. But now you got a higher memory overhead and the log-n overhead of inserting into the tree and then linearizing it at the end.

  1. How big should a chunk be? This is a multi-constraint sweet-spot calculation that depends on the density of your data, the IO device, the network and so on. A good heuristic is to aim between 100K and 1M non-empty elements. If your data is sparse, then make the chunk bigger and leave room so that on average logical_chunk_volume * data_density =~ 1M cells. If you are using a high number of attributes, might want to shrink it to a smaller size as some older code will try to hold all attributes for one chunk in memory at once. Hopefully that will be addressed soon.

  2. Using 1D output and redimensioning.
    Yeah it could be advantageous to create a 1D dense output. In 1D it’s very easy to fill data with SEQUENTIAL_WRITE and you can guarantee 100% density and therefore control the chunk sizing. So in essence you will be delaying the redimension problem. Can you redimension from within the operator? Yeah you can do anything you want from within the operator :sunglasses: – but there isn’t a clean API to call. You can read PhysicalRedimension.cpp and find that it calls a common routine called “redimensionArray” and then try to mimic the call.

On second thought – you mentioned nothing about sparse data. Are you 100% dense? If that’s the case - I would go with filling out 2D chunks with a good enough chunk size, with sequential write.
Please keep us posted on your progress!


#6

Hi apoliakov,

Thanks for the respond. I tried the plugin with the latest 13.2 release and it still works with the store operator… fingers crossed :wink:

Yes the data is 100% dense, so I will probably change it to sequential write with 2D chunks. Will keep you posted :smile: Thanks again!

Cheers,
Patrick


#7

One more thing.

How many input arrays there are in your operator and how many output arrays? Are you doing anything to redistribute / move data between instances? There are some special considerations when it comes to the distribution of data that the operator outputs. Let me know if you think we need to exchange more info on this.

  • Alex Poliakov (by the way :smile: )

#8

Hi Alex!

At the moment I have only one input array and one output array in the operator, with no overlapping, and testing against one instance. In long term I would like to boost the performance by running multiple instances - and I thought the data will be distributed automatically by SciDB to multiple instances with the chunk size of the array that I have produced? Will be good to know more in this area as the data set is getting quite large - up to 32 x 1,000,000 observations, eventually it will hit the memory limit on a single instance. Thanks!

Cheers,
Patrick


#9

Yep,

SciDB has a “natual” way of smearing chunks across instances. In the code it’s called “psRoundRobin” but actually happens to be a hash. See the method ArrayDesc::getChunkNumber - it’s that modulo number of instances.

Not sure if you need to be bothered with all that… To make the long story short. If the following conditions are all true:

  • single input array
  • output array has the same dimensions as input array
  • output on each instance contains the same chunk coordinates as input on that instance
    Then you don’t need to do anything special about distribution.

Essentially if you’re picking up a chunk on your instance, doing some processing, and then returning the same-shape result - no worries. Shrinking or inflating chunks should also be OK. Sounds like that’s what you’er doing. Anything more esoteric? Let me know.


#10

Right! That’s kinda my current implementation, except that the output array may not have the same sizes or dimensions as the input array. The input array is usually 2-dimensions and the output array can be 1 or 2-dimensions depending on what the Python script is going to produced.

One more question: at the moment I am using MemArray as placeholder to copy data back from Python list output (in the code above). If I am going to use multiple instances later I assume the content will be written to memory in the local instance first, then SciDB will redistributed the data to other instances according to chunk size/overlap, if a store() operator is used afterward? Or I should (can) swap MemArray with DBArray straightaway to avoid hitting the memory limit on the local instance? Thanks again.

Cheers,
Pat


#11

Yeah using MemArray is fine. MemArray is actually more than the name suggests; it will save chunks that are not in use to temp files using an LRU cache that is not allowed to exceed “mem-array-threshold” bytes in size. But only chunks that are not currently open are eligible for being swapped out. So you gotta be a little careful in how you open and flush them.

You can substitute MemArray with DBArray but then you have to worry about 20 more things: getting a unique array id, array locks, rollback, distribution, etc. I wouldn’t recommend this yet, based on what I know about your plugin.

If you think you might be breaking the distribution, you can just add these two overrides to your PhysicalOperator subclass:

    virtual bool changesDistribution(std::vector<ArrayDesc> const&) const
    {
        return true;
    }

    virtual ArrayDistribution getOutputDistribution(
            std::vector<ArrayDistribution> const& inputDistributions,
            std::vector< ArrayDesc> const& inputSchemas) const
    {
        return ArrayDistribution(psUndefined);
    }

That’s the magical incantation that tells the optimizer to insert a redistribute operation between your operator and store(), if store is present. Many builtin ops use this - PhysicalCrossJoin, PhysicalSlice, PhysicalSubArray…


#12

Thanks Alex! That’s exactly what I want! :smiley:


#13

Hi, Patrick,

May I ask how do you call the python function with Boost:python:list? I saw most examples showed the reverse way that they use boost:python to make c code as Python function.

Also for Scidb, it’s quite slow to output all the elements from source array to a vector or list in your code. Isn’t it?


#14

Hi ctozlm,

Sorry for the really late reply, hope you have already figured out how to do that :smile:
Anyway here is the code snippet of what I have done with Boost::python to load the custom module, hope that helps.

namespace py = boost::python;

PyThreadState* m_oldPyState;
PyThreadState* m_newPyState;

// Create sub-interpreter
m_newPyState = Py_NewInterpreter();
if (!m_newPyState) {
    throw USER_EXCEPTION(SCIDB_SE_EXECUTION, SCIDB_LE_UNKNOWN_ERROR)
              << ("PythonSubInterpreter: failed to call Py_NewInterpreter()");
}

// Swap to the new interpreter
m_oldPyState = PyThreadState_Swap(m_newPyState);

py::object main_module    = py::import("__main__");
py::object main_namespace = main_module.attr("__dict__");

// Add custom module path
// modulePath: full path to the Python module that you want to call
const string appendPath("import sys; sys.path.append('" + modulePath + "')");
py::exec(appendPath.c_str(), main_namespace);

// Import custom module and function
py::object custom_module = py::import(moduleName.c_str());
py::object custom_function = custom_module.attr(pyFuncName.c_str());

// Now call the custom function
py::object srcObj;
...
py::object dstObj  = custom_function( srcObj );
...

// Swap back to the previous thread and clean up sub-interpreter
PyThreadState_Swap(m_oldPyState);
Py_EndInterpreter(m_newPyState);