2

I am trying to work on some parallelization of many processes (task send/to be executed on many (let's say hundreds) nodes). I came across this solution: https://unix.stackexchange.com/a/216475

    # initialize a semaphore with a given number of tokens
    open_sem(){
        mkfifo pipe-$$
        exec 3<>pipe-$$
        rm pipe-$$
        local i=$1
        for((;i>0;i--)); do
            printf %s 000 >&3
        done
    }
# run the given command asynchronously and pop/push tokens
run_with_lock(){
    local x
    # this read waits until there is something to read
    read -u 3 -n 3 x &amp;&amp; ((0==x)) || exit $x
    (
     ( &quot;$@&quot;; )
    # push the return code of the command to the semaphore
    printf '%.3d' $? &gt;&amp;3
    )&amp;
}

N=4
open_sem $N
for thing in {a..g}; do
    run_with_lock task $thing
done 

I need some explanation here:

open_sem()

  1. what does exec 3<>pipe-$$ do?
  2. why is it removed afterwards?

run_with_lock()

  1. what does this && ((0==x)) || exit $x part mean?
  2. ( "$@"; ) - as far as i know this is list of all arguments passed... but what does it do here?

These are main obstacles for me to understand the process, but feel free to explain the whole flow :)

PS: I would just make a comment under that post, but I have just registered and do not have reputation to do so. Also it might be useful for others. Thanks! J.

  • 2
    Not an answer, but are you trying to re-implement GNU Parallel? If so, consider saving time and just use GNU Parallel. – Ole Tange Nov 10 '20 at 17:19

2 Answers2

1

The general idea first

When you read from a fifo, the read command will only finish when there is something written to the fifo. Any read command on a fifo will cause a script to "hang" until some other process writes to the same fifo.

You can use that as a semaphore by

  1. only executing some random command X after a read command finishes, and
  2. when the command is done you write once to allow for another process using the semaphore to finish their read command.

Now in order to allow for N initial processes to start you setup the semaphore with N write commands to the fifo.

So you get a queue of length N by initializing "the semaphore" with N write commands to the fifo (in this case bound to filedescriptor 3).

To repeat: take any command X and wrap it in a function f() with read on the line before command X and a write after it and send the wrapped command as a background job (f X &). Thus with 4 write commands already setup the 4 first background jobs will execute and the 5th will stop on its' read command. Thus if you add a write command in the wrapper after command X, the wrapped read in the 5th command X will be able to finish (and executes X on the next line) as soon as any one of the first 4 commands finishes and executes it's wrapped write command.

open_sem()

  1. exec can be used to redirect the input and output of a process or replace the current process, in this case it redirects filedescriptor 3 to(write) and from(read) the fifo. < for input, and > for output. If you give exec a command argument (which is not done here) then the bash shell is destroyed and it's PID replaced by the command. If you skip this step reading and writing to filedescriptor 3 will not create the read and write locks that you have with a fifo.
  2. removing it is not necessary, but just a way to cleanup.

run_with_lock()

  1. the read command reads what is written to the FIFO into variable x. In the example the write command to the fifo writes the exit status number of the command X so this can be used to check that the previous X command exited successfuly (code 0). If the x variable isn't 0 then exit with the exit status in variable x.
  2. It assumes that the first argument to run_with_lock() is a command and that remaining arguments are arguments to that command. Thus ("$@")& executes the command with arguments in a subshell that is sent to the background.
AdminBee
  • 22,803
1

I think there is simpler code which can do this. I've based this partially on the example given by the asker, partially on the code given here https://www.mlo.io/blog/2012/06/13/parallel-processes-in-bash/, and partially on @methuselah-0's explanations.

# make a fifo pipe
mkfifo pipe
exec 3<>pipe
rm -f pipe

fill with some number of values (will correspond to the number of spawned processes)

for i in seq 7; do { sleep $i; echo >&3; } & done

if a read from the queue succeeds, start a subprocess, otherwise wait

for another to finish and restock the queue with a readable entry

for f in ls /Users/pavelkomarov/data_cache; do read <&3 { python3 /Users/pavelkomarov/my_heavy_script.py -t $f; echo >&3; } & done