Python with "stream()" interface


#1

Hi friends,

I’m playing around with the “stream()” interface described here: https://github.com/Paradigm4/stream. To begin with, I made sure the installation was fine (two node, four instances on each) by testing the given example:

ubuntu@ip-172-31-9-115:~$ iquery -aq “stream(filter(build(val:double[i=0:0,1,0],0),false), ‘printf “1\nWhat is up?\n”’)”
{instance_id,chunk_no} response
{0,0} ‘What is up?’
{1,0} ‘What is up?’
{2,0} ‘What is up?’
{3,0} ‘What is up?’
{4,0} ‘What is up?’
{5,0} ‘What is up?’
{6,0} ‘What is up?’
{7,0} ‘What is up?’

Then I had a simple Python code to test with:

ubuntu@ip-172-31-9-115:~$ cat test_scidb_stream.py
#!/usr/bin/python
import sys
if len(sys.argv) < 2:
[tab] print "No input found"
else:
[tab] print “OK OK”

I “chmod +x” the Python file to make it executable in the command line. Now when I tested the Python code with the stream interface it returned an error below:

$ iquery -aq "stream(filter(build(val:double[i=0:0,1,0],0),false), ‘/home/ubuntu/test_scidb_stream.py’)"
SystemException in file: ChildProcess.cpp function: readIntoBuf line: 169
Error id: scidb::SCIDB_SE_INTERNAL::SCIDB_LE_ILLEGAL_OPERATION
Error description: Internal SciDB error. Illegal operation: error reading from child.
Failed query id: 0.1478246978130241578

Where did I do wrong?

Thanks!
Dongfang


#2

Hi Dongfang!

I knew this Python-streaming day would come; thanks for the question :slight_smile: . There are a couple of things to keep in mind:

  1. we communicate via stdin; and that’s not the same thing as the command line arguments.
  2. the interaction messaging in TSV has to be preceded with the number of lines. In other words, the message needs to look like this:
3
abc
def
xyz

Or, if there isn’t any data to send, you must still send “zero”:

0
  1. At the end of the interaction, scidb will send the “zero” message and you need to send a response to that message - either “zero” or otherwise.

So I wrote a little example Python streaming script. Admittedly, Python is not a language I use often, so please forgive if some of the form isn’t the best:

import sys
end_of_interaction = 0

while (end_of_interaction != 1):
  header = sys.stdin.readline().rstrip()
  if(header != "0"):
    #We receive a message from the SciDB instance:
    num_lines = int(header)  #how many lines did we get?

    #Collect all lines into a list:
    input_lines = []
    for i in range(0, num_lines):
      line = sys.stdin.readline().rstrip()
      input_lines.append(line)

    #Print a response: 
    print(num_lines+1)
    for i in range(0, num_lines):
       print("I got\t" + input_lines[i])
    print("THX!")
    sys.stdout.flush()
    #This will appear in the scidb-sterr.log file:
    sys.stderr.write("I got a chunk with "+ str(num_lines) + " lines of text!\n")
  else:
    #If we receive "0", it means the SciDB instance has no more
    #Data to give us. Here we have the option of also responding with "0"
    #Or sending some other message (i.e. a global sum):
    end_of_interaction = 1
    print("1")
    print("KTHXBYE")
    sys.stdout.flush()
    sys.stderr.write("I got the end-of-data message. Exiting.\n")

I will attach this file to the thread as well.

And here’s an example invocation:

$ iquery -aq "stream(build(<val:double> [i=0:9,4,0], i), 'python /home/apoliakov/streaming/test_stream.py', 'format=tsv')"
{instance_id,chunk_no} response
{0,0} 'I got	0
I got	1
I got	2
I got	3
THX!'
{0,1} 'KTHXBYE'
{1,0} 'I got	4
I got	5
I got	6
I got	7
THX!'
{1,1} 'KTHXBYE'
{2,0} 'I got	8
I got	9
THX!'
{2,1} 'KTHXBYE'
{3,0} 'KTHXBYE'

Note that STDERR output is redirected to the scidb-stderr log files. And that is very useful for log and debug purposes. Often errors will be found there:

$ cat ~/workspace/scidb/stage/DB-mydb/0/0/scidb-stderr.log 
2016-11-04 13:45:09 (ppid=4031): Started.
I got a chunk with 4 lines of text!
I got the end-of-data message. Exiting.

And the parse operator from accelerated_io_tools can be used to parse the output into SciDB attributes:

$ iquery -aq "apply(parse(stream(build(<val:double> [i=0:9,3,0], i), 'python /home/apoliakov/streaming/test_stream.py', 'format=tsv'), 'num_attributes=2'), number_at_the_end, dcast(a1, double(null)))"
{source_instance_id,chunk_no,line_no} a0,a1,error,number_at_the_end
{0,0,0} 'I got','0',null,0
{0,0,1} 'I got','1',null,1
{0,0,2} 'I got','2',null,2
{0,0,3} 'THX!',null,'short',null
{0,1,0} 'KTHXBYE',null,'short',null
{1,0,0} 'I got','3',null,3
{1,0,1} 'I got','4',null,4
{1,0,2} 'I got','5',null,5
{1,0,3} 'THX!',null,'short',null
{1,1,0} 'KTHXBYE',null,'short',null
{2,0,0} 'I got','6',null,6
{2,0,1} 'I got','7',null,7
{2,0,2} 'I got','8',null,8
{2,0,3} 'THX!',null,'short',null
{2,1,0} 'KTHXBYE',null,'short',null
{3,0,0} 'I got','9',null,9
{3,0,1} 'THX!',null,'short',null
{3,1,0} 'KTHXBYE',null,'short',null

Hope this helps. We’re talking about adding arrow/feather support to stream() soon. Obviously, the TSV thing isnt the fastest way to transfer data.


#3

Instead of trying to attach the python example as a file here, I put it in the github repo as an additional example test: https://github.com/Paradigm4/streaming/blob/master/examples/python_example.py


#4

Thank you so much Alex! Very helpful and I got the your Python code running fine natively:

$ python /home/ubuntu/mri_ref/test_stream.py
1
hi scidb
2
I got   hi scidb
THX!
I got a chunk with 1 lines of text!
0
1
KTHXBYE
I got the end-of-data message. Exiting.

but when I called it in SciDB it gave me the same error as before:

$ iquery -aq "stream(build(<val:double> [i=0:9,4,0], i), 'python /home/ubuntu/mri_ref/test_stream.py', 'format=tsv')"
SystemException in file: ChildProcess.cpp function: readIntoBuf line: 169
Error id: scidb::SCIDB_SE_INTERNAL::SCIDB_LE_ILLEGAL_OPERATION
Error description: Internal SciDB error. Illegal operation: error reading from child.
Failed query id: 0.1478288839202903072

Any ideas? I’m using SciDB 15.12 deployed on 2 Amazon EC2 nodes running Ubuntu 14.04.

-Dongfang


#5

Hm… Maybe you forgot to copy the file to the second EC2 node? That is a common mistake…
It does have to be in the same path on every node.

Remember that you can often see errors in the scidb-stderr log files. So we could check those files (there is one per instance) and often the error is there.


#6

Aha! You were right! Copying the code to the second node fixed it! It reminds me when I installed the library I also copied the .so files to all nodes.

So for in-line code (i.e., the ‘printf …’ example) SciDB could ship the query to remote node, right? Is there any particular reason why we cannot do that for files?

-Dongfang


#7

Yes - there are a few options there.

The maximum length of a SciDB query is about 128KB. So you could fit some code into the stream ‘’ argument. A human might have trouble keeping track of quotes and style, but a machine could do it.

There’s also an approach where we pass in two arrays to streaming. In that case, the second array is passed to the child first. And usually in those cases that array is replicated ( a copy on each instance). The hidden AFL-ism for that is

_sg(array, 0)

So some guys working in R are putting a “program to execute” into a chunk of an array and then sending that across. So in R that looks something like this:

#The program to stream is declared as expr1:
expr1 <- expression(
  {
    fn <- function(x) { x + 2 }
    map(fn)
  }
)

#Upload the serialized program into a temp SciDB array:
program <- as.scidb(base64enc::base64encode(serialize(expr1, NULL)))

#Stream through the program:
query1 <- sprintf("stream(build(<a:double>[i=1:4,1,0],i),
                 'R --slave -e \"library(scidbstrm); .program <- unserialize(base64enc::base64decode(getChunk()[[1]])); print(.program); eval(.program)\"',
                  'format=df',  'types=double', _sg(%s,0))", program@name)
print(iquery(query1, return = TRUE))

One chunk can technically contain up to 2GB of text and we could use multiple chunks too. So this is interesting and promising. We’ve kicked around ideas of persisting arrays of such programs and managing various streaming programs that way.

There’s an ongoing discussion about how to do this with a “program, plus a piece of state” here: https://github.com/Paradigm4/streaming/issues/6


#8

Hi Alex,

OK so we now have a new question/comment on the stream() interface :slight_smile: So in some sense, stream() allows us to map SciDB array chunks to an arbitrary program. In many cases, users want to do some kind of aggregation on the output arrays of these mappers. What’s the best practice of doing such aggregation? (I’m doing it manually with concatenation… if you recall my question before about how to achieve concatenation with redimension and merge…) Or do you have a plan to extend the stream() interface into something like stream(array, function, aggregator)?

Best,
Dongfang


#9

And one more question…

We recently noticed that the child process spawn by “stream()” didn’t carry over the environment variables from the parent process. We only realized this for some specific applications, such as LSST.

We tried to put the setup commands in .bashrc and .profile, but the child process didn’t seem to pick them up…

Any suggestions?


#10

Hey Dongfang!

Many kinds of aggregations are pretty easy to do right now. It may require two stream operators inside the query. A common pattern is:

stream(
 _sg(
   stream(DATA, 'first_aggregator_process'),
   2, 0 
 ),
 'second_aggregator_process'
)

Where first_aggregator_process will output, perhaps, a summary value once per instance. The second_aggregator_process may then combine the partial summaries. The operator _sg() means “scatter/gather” and is used to redistribute data between instances where 2 means “send everything to one instance” and 0 is the number of the instance.

Here are some R examples:

This is also doable in Python. For example, here at line 19, you could simply print the sequence 0\n - and that would mean “i don’t have data to output right now”:

And then here the end of the interaction, instead of this row, you could output the local sum, or what have you:

Note - not only can you write a simple aggregate, you can also use joins of stream operators to perform quite a variety of different tasks. Also, the stream operator accepts a second optional array, so you can do some flavors of custom joins inside a stream command.

As for environment variables - yes the process is forked from the scidb process so it inherits the environment that was used when SciDB was spawned (which by default happens over SSH). But of course - you are launching an arbitrary program, so you can set variables or source files inside your program:

$ iquery -aq "stream(filter(build(<val:double>[i=0:0,1,0], 0), false), 'my_variable=SciDB; echo -e \"1\n\$my_variable\"')" | head -n 5
{instance_id,chunk_no} response
{0,0} 'SciDB'
{1,0} 'SciDB'
{2,0} 'SciDB'
{3,0} 'SciDB'