Mercurial > dotfiles
diff .zfun/async @ 495:d3d52766cfcd
zsh: update async to 1.8.4
author | Augie Fackler <raf@durin42.com> |
---|---|
date | Sun, 27 Sep 2020 16:10:50 -0400 |
parents | e84b6da69ea0 |
children | 2ea29b487b06 |
line wrap: on
line diff
--- a/.zfun/async +++ b/.zfun/async @@ -3,99 +3,221 @@ # # zsh-async # -# version: 1.1.0 +# version: v1.8.4 # author: Mathias Fredriksson # url: https://github.com/mafredri/zsh-async # +typeset -g ASYNC_VERSION=1.8.4 +# Produce debug output from zsh-async when set to 1. +typeset -g ASYNC_DEBUG=${ASYNC_DEBUG:-0} + +# Execute commands that can manipulate the environment inside the async worker. Return output via callback. +_async_eval() { + local ASYNC_JOB_NAME + # Rename job to _async_eval and redirect all eval output to cat running + # in _async_job. Here, stdout and stderr are not separated for + # simplicity, this could be improved in the future. + { + eval "$@" + } &> >(ASYNC_JOB_NAME=[async/eval] _async_job 'cat') +} + # Wrapper for jobs executed by the async worker, gives output in parseable format with execution time _async_job() { - # Store start time as double precision (+E disables scientific notation) + # Disable xtrace as it would mangle the output. + setopt localoptions noxtrace + + # Store start time for job. float -F duration=$EPOCHREALTIME - # Run the command - # - # What is happening here is that we are assigning stdout, stderr and ret to - # variables, and then we are printing out the variable assignment through - # typeset -p. This way when we run eval we get something along the lines of: - # eval " - # typeset stdout=' M async.test.sh\n M async.zsh' - # typeset ret=0 - # typeset stderr='' - # " - unset stdout stderr ret - eval "$( + # Run the command and capture both stdout (`eval`) and stderr (`cat`) in + # separate subshells. When the command is complete, we grab write lock + # (mutex token) and output everything except stderr inside the command + # block, after the command block has completed, the stdin for `cat` is + # closed, causing stderr to be appended with a $'\0' at the end to mark the + # end of output from this job. + local jobname=${ASYNC_JOB_NAME:-$1} out + out="$( + local stdout stderr ret tok { stdout=$(eval "$@") ret=$? - typeset -p stdout ret - } 2> >(stderr=$(cat); typeset -p stderr) - )" + duration=$(( EPOCHREALTIME - duration )) # Calculate duration. - # Calculate duration - duration=$(( EPOCHREALTIME - duration )) - - # stip all null-characters from stdout and stderr - stdout=${stdout//$'\0'/} - stderr=${stderr//$'\0'/} - - # if ret is missing for some unknown reason, set it to -1 to indicate we - # have run into a bug - ret=${ret:--1} + print -r -n - $'\0'${(q)jobname} $ret ${(q)stdout} $duration + } 2> >(stderr=$(cat) && print -r -n - " "${(q)stderr}$'\0') + )" + if [[ $out != $'\0'*$'\0' ]]; then + # Corrupted output (aborted job?), skipping. + return + fi - # Grab mutex lock - read -ep >/dev/null + # Grab mutex lock, stalls until token is available. + read -r -k 1 -p tok || return 1 - # return output (<job_name> <return_code> <stdout> <duration> <stderr>) - print -r -N -n -- "$1" "$ret" "$stdout" "$duration" "$stderr"$'\0' + # Return output (<job_name> <return_code> <stdout> <duration> <stderr>). + print -r -n - "$out" - # Unlock mutex - print -p "t" + # Unlock mutex by inserting a token. + print -n -p $tok } # The background worker manages all tasks and runs them without interfering with other processes _async_worker() { + # Reset all options to defaults inside async worker. + emulate -R zsh + + # Make sure monitor is unset to avoid printing the + # pids of child processes. + unsetopt monitor + + # Redirect stderr to `/dev/null` in case unforseen errors produced by the + # worker. For example: `fork failed: resource temporarily unavailable`. + # Some older versions of zsh might also print malloc errors (know to happen + # on at least zsh 5.0.2 and 5.0.8) likely due to kill signals. + exec 2>/dev/null + + # When a zpty is deleted (using -d) all the zpty instances created before + # the one being deleted receive a SIGHUP, unless we catch it, the async + # worker would simply exit (stop working) even though visible in the list + # of zpty's (zpty -L). This has been fixed around the time of Zsh 5.4 + # (not released). + if ! is-at-least 5.4.1; then + TRAPHUP() { + return 0 # Return 0, indicating signal was handled. + } + fi + local -A storage local unique=0 + local notify_parent=0 + local parent_pid=0 + local coproc_pid=0 + local processing=0 + + local -a zsh_hooks zsh_hook_functions + zsh_hooks=(chpwd periodic precmd preexec zshexit zshaddhistory) + zsh_hook_functions=(${^zsh_hooks}_functions) + unfunction $zsh_hooks &>/dev/null # Deactivate all zsh hooks inside the worker. + unset $zsh_hook_functions # And hooks with registered functions. + unset zsh_hooks zsh_hook_functions # Cleanup. + + close_idle_coproc() { + local -a pids + pids=(${${(v)jobstates##*:*:}%\=*}) + + # If coproc (cat) is the only child running, we close it to avoid + # leaving it running indefinitely and cluttering the process tree. + if (( ! processing )) && [[ $#pids = 1 ]] && [[ $coproc_pid = $pids[1] ]]; then + coproc : + coproc_pid=0 + fi + } + + child_exit() { + close_idle_coproc + + # On older version of zsh (pre 5.2) we notify the parent through a + # SIGWINCH signal because `zpty` did not return a file descriptor (fd) + # prior to that. + if (( notify_parent )); then + # We use SIGWINCH for compatibility with older versions of zsh + # (pre 5.1.1) where other signals (INFO, ALRM, USR1, etc.) could + # cause a deadlock in the shell under certain circumstances. + kill -WINCH $parent_pid + fi + } - # Process option parameters passed to worker - while getopts "np:u" opt; do + # Register a SIGCHLD trap to handle the completion of child processes. + trap child_exit CHLD + + # Process option parameters passed to worker. + while getopts "np:uz" opt; do case $opt in - # Use SIGWINCH since many others seem to cause zsh to freeze, e.g. ALRM, INFO, etc. - n) trap 'kill -WINCH $ASYNC_WORKER_PARENT_PID' CHLD;; - p) ASYNC_WORKER_PARENT_PID=$OPTARG;; + n) notify_parent=1;; + p) parent_pid=$OPTARG;; u) unique=1;; + z) notify_parent=0;; # Uses ZLE watcher instead. esac done - # Create a mutex for writing to the terminal through coproc - coproc cat - # Insert token into coproc - print -p "t" + # Terminate all running jobs, note that this function does not + # reinstall the child trap. + terminate_jobs() { + trap - CHLD # Ignore child exits during kill. + coproc : # Quit coproc. + coproc_pid=0 # Reset pid. + + if is-at-least 5.4.1; then + trap '' HUP # Catch the HUP sent to this process. + kill -HUP -$$ # Send to entire process group. + trap - HUP # Disable HUP trap. + else + # We already handle HUP for Zsh < 5.4.1. + kill -HUP -$$ # Send to entire process group. + fi + } - while read -r cmd; do - # Separate on spaces into an array - cmd=(${=cmd}) - local job=$cmd[1] + killjobs() { + local tok + local -a pids + pids=(${${(v)jobstates##*:*:}%\=*}) + + # No need to send SIGHUP if no jobs are running. + (( $#pids == 0 )) && continue + (( $#pids == 1 )) && [[ $coproc_pid = $pids[1] ]] && continue + + # Grab lock to prevent half-written output in case a child + # process is in the middle of writing to stdin during kill. + (( coproc_pid )) && read -r -k 1 -p tok + + terminate_jobs + trap child_exit CHLD # Reinstall child trap. + } + + local request do_eval=0 + local -a cmd + while :; do + # Wait for jobs sent by async_job. + read -r -d $'\0' request || { + # Unknown error occurred while reading from stdin, the zpty + # worker is likely in a broken state, so we shut down. + terminate_jobs + + # Stdin is broken and in case this was an unintended + # crash, we try to report it as a last hurrah. + print -r -n $'\0'"'[async]'" $(( 127 + 3 )) "''" 0 "'$0:$LINENO: zpty fd died, exiting'"$'\0' + + # We use `return` to abort here because using `exit` may + # result in an infinite loop that never exits and, as a + # result, high CPU utilization. + return $(( 127 + 1 )) + } + + # We need to clean the input here because sometimes when a zpty + # has died and been respawned, messages will be prefixed with a + # carraige return (\r, or \C-M). + request=${request#$'\C-M'} # Check for non-job commands sent to worker - case $job in - _unset_trap) - trap - CHLD; continue;; - _killjobs) - # Do nothing in the worker when receiving the TERM signal - trap '' TERM - # Send TERM to the entire process group (PID and all children) - kill -TERM -$$ &>/dev/null - # Reset trap - trap - TERM - continue - ;; + case $request in + _killjobs) killjobs; continue;; + _async_eval*) do_eval=1;; esac - # If worker should perform unique jobs - if (( unique )); then - # Check if a previous job is still running, if yes, let it finnish + # Parse the request using shell parsing (z) to allow commands + # to be parsed from single strings and multi-args alike. + cmd=("${(z)request}") + + # Name of the job (first argument). + local job=$cmd[1] + + # Check if a worker should perform unique jobs, unless + # this is an eval since they run synchronously. + if (( !do_eval )) && (( unique )); then + # Check if a previous job is still running, if yes, + # skip this job and let the previous one finish. for pid in ${${(v)jobstates##*:*:}%\=*}; do if [[ ${storage[$job]} == $pid ]]; then continue 2 @@ -103,16 +225,47 @@ done fi - # Run task in background - _async_job $cmd & - # Store pid because zsh job manager is extremely unflexible (show jobname as non-unique '$job')... - storage[$job]=$! + # Guard against closing coproc from trap before command has started. + processing=1 + + # Because we close the coproc after the last job has completed, we must + # recreate it when there are no other jobs running. + if (( ! coproc_pid )); then + # Use coproc as a mutex for synchronized output between children. + coproc cat + coproc_pid="$!" + # Insert token into coproc + print -n -p "t" + fi + + if (( do_eval )); then + shift cmd # Strip _async_eval from cmd. + _async_eval $cmd + else + # Run job in background, completed jobs are printed to stdout. + _async_job $cmd & + # Store pid because zsh job manager is extremely unflexible (show jobname as non-unique '$job')... + storage[$job]="$!" + fi + + processing=0 # Disable guard. + + if (( do_eval )); then + do_eval=0 + + # When there are no active jobs we can't rely on the CHLD trap to + # manage the coproc lifetime. + close_idle_coproc + fi done } # -# Get results from finnished jobs and pass it to the to callback function. This is the only way to reliably return the -# job name, return code, output and execution time and with minimal effort. +# Get results from finished jobs and pass it to the to callback function. This is the only way to reliably return the +# job name, return code, output and execution time and with minimal effort. +# +# If the async process buffer becomes corrupt, the callback will be invoked with the first argument being `[async]` (job +# name), non-zero return code and fifth argument describing the error (stderr). # # usage: # async_process_results <worker_name> <callback_function> @@ -123,42 +276,64 @@ # $3 = resulting stdout from execution # $4 = execution time, floating point e.g. 2.05 seconds # $5 = resulting stderr from execution +# $6 = has next result in buffer (0 = buffer empty, 1 = yes) # async_process_results() { - setopt localoptions noshwordsplit + setopt localoptions unset noshwordsplit noksharrays noposixidentifiers noposixstrings - integer count=0 local worker=$1 local callback=$2 + local caller=$3 local -a items - local IFS=$'\0' + local null=$'\0' data + integer -l len pos num_processed has_next typeset -gA ASYNC_PROCESS_BUFFER - # Read output from zpty and parse it if available - while zpty -rt $worker line 2>/dev/null; do - # Remove unwanted \r from output - ASYNC_PROCESS_BUFFER[$worker]+=${line//$'\r'$'\n'/$'\n'} - # Split buffer on null characters, preserve empty elements - items=("${(@)=ASYNC_PROCESS_BUFFER[$worker]}") - # Remove last element since it's due to the return string separator structure - items=("${(@)items[1,${#items}-1]}") - - # Continue until we receive all information - (( ${#items} % 5 )) && continue - - # Work through all results - while (( ${#items} > 0 )); do - $callback "${(@)=items[1,5]}" - shift 5 items - count+=1 - done - # Empty the buffer - unset "ASYNC_PROCESS_BUFFER[$worker]" + # Read output from zpty and parse it if available. + while zpty -r -t $worker data 2>/dev/null; do + ASYNC_PROCESS_BUFFER[$worker]+=$data + len=${#ASYNC_PROCESS_BUFFER[$worker]} + pos=${ASYNC_PROCESS_BUFFER[$worker][(i)$null]} # Get index of NULL-character (delimiter). + + # Keep going until we find a NULL-character. + if (( ! len )) || (( pos > len )); then + continue + fi + + while (( pos <= len )); do + # Take the content from the beginning, until the NULL-character and + # perform shell parsing (z) and unquoting (Q) as an array (@). + items=("${(@Q)${(z)ASYNC_PROCESS_BUFFER[$worker][1,$pos-1]}}") + + # Remove the extracted items from the buffer. + ASYNC_PROCESS_BUFFER[$worker]=${ASYNC_PROCESS_BUFFER[$worker][$pos+1,$len]} + + len=${#ASYNC_PROCESS_BUFFER[$worker]} + if (( len > 1 )); then + pos=${ASYNC_PROCESS_BUFFER[$worker][(i)$null]} # Get index of NULL-character (delimiter). + fi + + has_next=$(( len != 0 )) + if (( $#items == 5 )); then + items+=($has_next) + $callback "${(@)items}" # Send all parsed items to the callback. + (( num_processed++ )) + elif [[ -z $items ]]; then + # Empty items occur between results due to double-null ($'\0\0') + # caused by commands being both pre and suffixed with null. + else + # In case of corrupt data, invoke callback with *async* as job + # name, non-zero exit status and an error message on stderr. + $callback "[async]" 1 "" 0 "$0:$LINENO: error: bad format, got ${#items} items (${(q)items})" $has_next + fi + done done - # If we processed any results, return success - (( count )) && return 0 + (( num_processed )) && return 0 + + # Avoid printing exit value when `setopt printexitvalue` is active.` + [[ $caller = trap || $caller = watcher ]] && return 0 # No results were processed return 1 @@ -171,11 +346,49 @@ async_process_results() { local worker=$ASYNC_PTYS[$1] local callback=$ASYNC_CALLBACKS[$worker] + if [[ -n $2 ]]; then + # from man zshzle(1): + # `hup' for a disconnect, `nval' for a closed or otherwise + # invalid descriptor, or `err' for any other condition. + # Systems that support only the `select' system call always use + # `err'. + + # this has the side effect to unregister the broken file descriptor + async_stop_worker $worker + + if [[ -n $callback ]]; then + $callback '[async]' 2 "" 0 "$0:$LINENO: error: fd for $worker failed: zle -F $1 returned error $2" 0 + fi + return + fi; + if [[ -n $callback ]]; then - async_process_results $worker $callback + async_process_results $worker $callback watcher fi } +_async_send_job() { + setopt localoptions noshwordsplit noksharrays noposixidentifiers noposixstrings + + local caller=$1 + local worker=$2 + shift 2 + + zpty -t $worker &>/dev/null || { + typeset -gA ASYNC_CALLBACKS + local callback=$ASYNC_CALLBACKS[$worker] + + if [[ -n $callback ]]; then + $callback '[async]' 3 "" 0 "$0:$LINENO: error: no such worker: $worker" 0 + else + print -u2 "$caller: no such async worker: $worker" + fi + return 1 + } + + zpty -w $worker "$@"$'\0' +} + # # Start a new asynchronous job on specified worker, assumes the worker is running. # @@ -183,18 +396,50 @@ async_process_results() { # async_job <worker_name> <my_function> [<function_params>] # async_job() { - setopt localoptions noshwordsplit + setopt localoptions noshwordsplit noksharrays noposixidentifiers noposixstrings local worker=$1; shift - zpty -w $worker $@ + + local -a cmd + cmd=("$@") + if (( $#cmd > 1 )); then + cmd=(${(q)cmd}) # Quote special characters in multi argument commands. + fi + + _async_send_job $0 $worker "$cmd" +} + +# +# Evaluate a command (like async_job) inside the async worker, then worker environment can be manipulated. For example, +# issuing a cd command will change the PWD of the worker which will then be inherited by all future async jobs. +# +# Output will be returned via callback, job name will be [async/eval]. +# +# usage: +# async_worker_eval <worker_name> <my_function> [<function_params>] +# +async_worker_eval() { + setopt localoptions noshwordsplit noksharrays noposixidentifiers noposixstrings + + local worker=$1; shift + + local -a cmd + cmd=("$@") + if (( $#cmd > 1 )); then + cmd=(${(q)cmd}) # Quote special characters in multi argument commands. + fi + + # Quote the cmd in case RC_EXPAND_PARAM is set. + _async_send_job $0 $worker "_async_eval $cmd" } # This function traps notification signals and calls all registered callbacks _async_notify_trap() { setopt localoptions noshwordsplit + local k for k in ${(k)ASYNC_CALLBACKS}; do - async_process_results $k ${ASYNC_CALLBACKS[$k]} + async_process_results $k ${ASYNC_CALLBACKS[$k]} trap done } @@ -208,13 +453,23 @@ async_job() { async_register_callback() { setopt localoptions noshwordsplit nolocaltraps - typeset -gA ASYNC_CALLBACKS + typeset -gA ASYNC_PTYS ASYNC_CALLBACKS local worker=$1; shift ASYNC_CALLBACKS[$worker]="$*" - if (( ! ASYNC_USE_ZLE_HANDLER )); then + # Enable trap when the ZLE watcher is unavailable, allows + # workers to notify (via -n) when a job is done. + if [[ ! -o interactive ]] || [[ ! -o zle ]]; then trap '_async_notify_trap' WINCH + elif [[ -o interactive ]] && [[ -o zle ]]; then + local fd w + for fd w in ${(@kv)ASYNC_PTYS}; do + if [[ $w == $worker ]]; then + zle -F $fd _async_zle_watcher # Register the ZLE handler. + break + fi + done fi } @@ -246,12 +501,19 @@ async_flush_jobs() { zpty -t $worker &>/dev/null || return 1 # Send kill command to worker - zpty -w $worker "_killjobs" - - # Clear all output buffers - while zpty -r $worker line; do true; done + async_job $worker "_killjobs" + + # Clear the zpty buffer. + local junk + if zpty -r -t $worker junk '*'; then + (( ASYNC_DEBUG )) && print -n "async_flush_jobs $worker: ${(V)junk}" + while zpty -r -t $worker junk '*'; do + (( ASYNC_DEBUG )) && print -n "${(V)junk}" + done + (( ASYNC_DEBUG )) && print + fi - # Clear any partial buffers + # Finally, clear the process buffer in case of partially parsed responses. typeset -gA ASYNC_PROCESS_BUFFER unset "ASYNC_PROCESS_BUFFER[$worker]" } @@ -269,24 +531,70 @@ async_flush_jobs() { # -p pid to notify (defaults to current pid) # async_start_worker() { - setopt localoptions noshwordsplit + setopt localoptions noshwordsplit noclobber local worker=$1; shift + local -a args + args=("$@") zpty -t $worker &>/dev/null && return typeset -gA ASYNC_PTYS typeset -h REPLY - zpty -b $worker _async_worker -p $$ $@ || { + typeset has_xtrace=0 + + if [[ -o interactive ]] && [[ -o zle ]]; then + # Inform the worker to ignore the notify flag and that we're + # using a ZLE watcher instead. + args+=(-z) + + if (( ! ASYNC_ZPTY_RETURNS_FD )); then + # When zpty doesn't return a file descriptor (on older versions of zsh) + # we try to guess it anyway. + integer -l zptyfd + exec {zptyfd}>&1 # Open a new file descriptor (above 10). + exec {zptyfd}>&- # Close it so it's free to be used by zpty. + fi + fi + + # Workaround for stderr in the main shell sometimes (incorrectly) being + # reassigned to /dev/null by the reassignment done inside the async + # worker. + # See https://github.com/mafredri/zsh-async/issues/35. + integer errfd=-1 + exec {errfd}>&2 + + # Make sure async worker is started without xtrace + # (the trace output interferes with the worker). + [[ -o xtrace ]] && { + has_xtrace=1 + unsetopt xtrace + } + + zpty -b $worker _async_worker -p $$ $args 2>&$errfd + local ret=$? + + # Re-enable it if it was enabled, for debugging. + (( has_xtrace )) && setopt xtrace + exec {errfd}>& - + + if (( ret )); then async_stop_worker $worker return 1 - } + fi - if (( ASYNC_USE_ZLE_HANDLER )); then - ASYNC_PTYS[$REPLY]=$worker - zle -F $REPLY _async_zle_watcher + if ! is-at-least 5.0.8; then + # For ZSH versions older than 5.0.8 we delay a bit to give + # time for the worker to start before issuing commands, + # otherwise it will not be ready to receive them. + sleep 0.001 + fi + + if [[ -o interactive ]] && [[ -o zle ]]; then + if (( ! ASYNC_ZPTY_RETURNS_FD )); then + REPLY=$zptyfd # Use the guessed value for the file desciptor. + fi - # If worker was called with -n, disable trap in favor of zle handler - async_job $worker _unset_trap + ASYNC_PTYS[$REPLY]=$worker # Map the file desciptor to the worker. fi } @@ -299,7 +607,7 @@ async_start_worker() { async_stop_worker() { setopt localoptions noshwordsplit - local ret=0 + local ret=0 worker k v for worker in $@; do # Find and unregister the zle handler for the worker for k v in ${(@kv)ASYNC_PTYS}; do @@ -310,6 +618,10 @@ async_stop_worker() { done async_unregister_callback $worker zpty -d $worker 2>/dev/null || ret=$? + + # Clear any partial buffers. + typeset -gA ASYNC_PROCESS_BUFFER + unset "ASYNC_PROCESS_BUFFER[$worker]" done return $ret @@ -323,17 +635,21 @@ async_stop_worker() { # async_init() { (( ASYNC_INIT_DONE )) && return - ASYNC_INIT_DONE=1 + typeset -g ASYNC_INIT_DONE=1 zmodload zsh/zpty zmodload zsh/datetime - # Check if zsh/zpty returns a file descriptor or not, shell must also be interactive - ASYNC_USE_ZLE_HANDLER=0 - [[ -o interactive ]] && { + # Load is-at-least for reliable version check. + autoload -Uz is-at-least + + # Check if zsh/zpty returns a file descriptor or not, + # shell must also be interactive with zle enabled. + typeset -g ASYNC_ZPTY_RETURNS_FD=0 + [[ -o interactive ]] && [[ -o zle ]] && { typeset -h REPLY - zpty _async_test cat - (( REPLY )) && ASYNC_USE_ZLE_HANDLER=1 + zpty _async_test : + (( REPLY )) && ASYNC_ZPTY_RETURNS_FD=1 zpty -d _async_test } }