changeset 496:1944a98a6774

merge
author Augie Fackler <raf@durin42.com>
date Sun, 27 Sep 2020 16:11:36 -0400
parents d3d52766cfcd (diff) 1bb1c44ce49c (current diff)
children 2ea29b487b06
files
diffstat 2 files changed, 435 insertions(+), 118 deletions(-) [+]
line wrap: on
line diff
--- a/.hgignore
+++ b/.hgignore
@@ -31,6 +31,7 @@ hgwebdir\.conf
 ^Misc. Stuff
 ^Movies
 ^Music
+^NoBackup
 ^Pictures
 ^Programming
 ^Public
--- a/.zfun/async
+++ b/.zfun/async
@@ -3,99 +3,221 @@
 #
 # zsh-async
 #
-# version: 1.1.0
+# version: v1.8.4
 # author: Mathias Fredriksson
 # url: https://github.com/mafredri/zsh-async
 #
 
+typeset -g ASYNC_VERSION=1.8.4
+# Produce debug output from zsh-async when set to 1.
+typeset -g ASYNC_DEBUG=${ASYNC_DEBUG:-0}
+
+# Execute commands that can manipulate the environment inside the async worker. Return output via callback.
+_async_eval() {
+	local ASYNC_JOB_NAME
+	# Rename job to _async_eval and redirect all eval output to cat running
+	# in _async_job. Here, stdout and stderr are not separated for
+	# simplicity, this could be improved in the future.
+	{
+		eval "$@"
+	} &> >(ASYNC_JOB_NAME=[async/eval] _async_job 'cat')
+}
+
 # Wrapper for jobs executed by the async worker, gives output in parseable format with execution time
 _async_job() {
-	# Store start time as double precision (+E disables scientific notation)
+	# Disable xtrace as it would mangle the output.
+	setopt localoptions noxtrace
+
+	# Store start time for job.
 	float -F duration=$EPOCHREALTIME
 
-	# Run the command
-	#
-	# What is happening here is that we are assigning stdout, stderr and ret to
-	# variables, and then we are printing out the variable assignment through
-	# typeset -p. This way when we run eval we get something along the lines of:
-	# 	eval "
-	# 		typeset stdout=' M async.test.sh\n M async.zsh'
-	# 		typeset ret=0
-	# 		typeset stderr=''
-	# 	"
-	unset stdout stderr ret
-	eval "$(
+	# Run the command and capture both stdout (`eval`) and stderr (`cat`) in
+	# separate subshells. When the command is complete, we grab write lock
+	# (mutex token) and output everything except stderr inside the command
+	# block, after the command block has completed, the stdin for `cat` is
+	# closed, causing stderr to be appended with a $'\0' at the end to mark the
+	# end of output from this job.
+	local jobname=${ASYNC_JOB_NAME:-$1} out
+	out="$(
+		local stdout stderr ret tok
 		{
 			stdout=$(eval "$@")
 			ret=$?
-			typeset -p stdout ret
-		} 2> >(stderr=$(cat); typeset -p stderr)
-	)"
+			duration=$(( EPOCHREALTIME - duration ))  # Calculate duration.
 
-	# Calculate duration
-	duration=$(( EPOCHREALTIME - duration ))
-
-	# stip all null-characters from stdout and stderr
-	stdout=${stdout//$'\0'/}
-	stderr=${stderr//$'\0'/}
-
-	# if ret is missing for some unknown reason, set it to -1 to indicate we
-	# have run into a bug
-	ret=${ret:--1}
+			print -r -n - $'\0'${(q)jobname} $ret ${(q)stdout} $duration
+		} 2> >(stderr=$(cat) && print -r -n - " "${(q)stderr}$'\0')
+	)"
+	if [[ $out != $'\0'*$'\0' ]]; then
+		# Corrupted output (aborted job?), skipping.
+		return
+	fi
 
-	# Grab mutex lock
-	read -ep >/dev/null
+	# Grab mutex lock, stalls until token is available.
+	read -r -k 1 -p tok || return 1
 
-	# return output (<job_name> <return_code> <stdout> <duration> <stderr>)
-	print -r -N -n -- "$1" "$ret" "$stdout" "$duration" "$stderr"$'\0'
+	# Return output (<job_name> <return_code> <stdout> <duration> <stderr>).
+	print -r -n - "$out"
 
-	# Unlock mutex
-	print -p "t"
+	# Unlock mutex by inserting a token.
+	print -n -p $tok
 }
 
 # The background worker manages all tasks and runs them without interfering with other processes
 _async_worker() {
+	# Reset all options to defaults inside async worker.
+	emulate -R zsh
+
+	# Make sure monitor is unset to avoid printing the
+	# pids of child processes.
+	unsetopt monitor
+
+	# Redirect stderr to `/dev/null` in case unforseen errors produced by the
+	# worker. For example: `fork failed: resource temporarily unavailable`.
+	# Some older versions of zsh might also print malloc errors (know to happen
+	# on at least zsh 5.0.2 and 5.0.8) likely due to kill signals.
+	exec 2>/dev/null
+
+	# When a zpty is deleted (using -d) all the zpty instances created before
+	# the one being deleted receive a SIGHUP, unless we catch it, the async
+	# worker would simply exit (stop working) even though visible in the list
+	# of zpty's (zpty -L). This has been fixed around the time of Zsh 5.4
+	# (not released).
+	if ! is-at-least 5.4.1; then
+		TRAPHUP() {
+			return 0  # Return 0, indicating signal was handled.
+		}
+	fi
+
 	local -A storage
 	local unique=0
+	local notify_parent=0
+	local parent_pid=0
+	local coproc_pid=0
+	local processing=0
+
+	local -a zsh_hooks zsh_hook_functions
+	zsh_hooks=(chpwd periodic precmd preexec zshexit zshaddhistory)
+	zsh_hook_functions=(${^zsh_hooks}_functions)
+	unfunction $zsh_hooks &>/dev/null   # Deactivate all zsh hooks inside the worker.
+	unset $zsh_hook_functions           # And hooks with registered functions.
+	unset zsh_hooks zsh_hook_functions  # Cleanup.
+
+	close_idle_coproc() {
+		local -a pids
+		pids=(${${(v)jobstates##*:*:}%\=*})
+
+		# If coproc (cat) is the only child running, we close it to avoid
+		# leaving it running indefinitely and cluttering the process tree.
+		if  (( ! processing )) && [[ $#pids = 1 ]] && [[ $coproc_pid = $pids[1] ]]; then
+			coproc :
+			coproc_pid=0
+		fi
+	}
+
+	child_exit() {
+		close_idle_coproc
+
+		# On older version of zsh (pre 5.2) we notify the parent through a
+		# SIGWINCH signal because `zpty` did not return a file descriptor (fd)
+		# prior to that.
+		if (( notify_parent )); then
+			# We use SIGWINCH for compatibility with older versions of zsh
+			# (pre 5.1.1) where other signals (INFO, ALRM, USR1, etc.) could
+			# cause a deadlock in the shell under certain circumstances.
+			kill -WINCH $parent_pid
+		fi
+	}
 
-	# Process option parameters passed to worker
-	while getopts "np:u" opt; do
+	# Register a SIGCHLD trap to handle the completion of child processes.
+	trap child_exit CHLD
+
+	# Process option parameters passed to worker.
+	while getopts "np:uz" opt; do
 		case $opt in
-			# Use SIGWINCH since many others seem to cause zsh to freeze, e.g. ALRM, INFO, etc.
-			n) trap 'kill -WINCH $ASYNC_WORKER_PARENT_PID' CHLD;;
-			p) ASYNC_WORKER_PARENT_PID=$OPTARG;;
+			n) notify_parent=1;;
+			p) parent_pid=$OPTARG;;
 			u) unique=1;;
+			z) notify_parent=0;;  # Uses ZLE watcher instead.
 		esac
 	done
 
-	# Create a mutex for writing to the terminal through coproc
-	coproc cat
-	# Insert token into coproc
-	print -p "t"
+	# Terminate all running jobs, note that this function does not
+	# reinstall the child trap.
+	terminate_jobs() {
+		trap - CHLD   # Ignore child exits during kill.
+		coproc :      # Quit coproc.
+		coproc_pid=0  # Reset pid.
+
+		if is-at-least 5.4.1; then
+			trap '' HUP    # Catch the HUP sent to this process.
+			kill -HUP -$$  # Send to entire process group.
+			trap - HUP     # Disable HUP trap.
+		else
+			# We already handle HUP for Zsh < 5.4.1.
+			kill -HUP -$$  # Send to entire process group.
+		fi
+	}
 
-	while read -r cmd; do
-		# Separate on spaces into an array
-		cmd=(${=cmd})
-		local job=$cmd[1]
+	killjobs() {
+		local tok
+		local -a pids
+		pids=(${${(v)jobstates##*:*:}%\=*})
+
+		# No need to send SIGHUP if no jobs are running.
+		(( $#pids == 0 )) && continue
+		(( $#pids == 1 )) && [[ $coproc_pid = $pids[1] ]] && continue
+
+		# Grab lock to prevent half-written output in case a child
+		# process is in the middle of writing to stdin during kill.
+		(( coproc_pid )) && read -r -k 1 -p tok
+
+		terminate_jobs
+		trap child_exit CHLD  # Reinstall child trap.
+	}
+
+	local request do_eval=0
+	local -a cmd
+	while :; do
+		# Wait for jobs sent by async_job.
+		read -r -d $'\0' request || {
+			# Unknown error occurred while reading from stdin, the zpty
+			# worker is likely in a broken state, so we shut down.
+			terminate_jobs
+
+			# Stdin is broken and in case this was an unintended
+			# crash, we try to report it as a last hurrah.
+			print -r -n $'\0'"'[async]'" $(( 127 + 3 )) "''" 0 "'$0:$LINENO: zpty fd died, exiting'"$'\0'
+
+			# We use `return` to abort here because using `exit` may
+			# result in an infinite loop that never exits and, as a
+			# result, high CPU utilization.
+			return $(( 127 + 1 ))
+		}
+
+		# We need to clean the input here because sometimes when a zpty
+		# has died and been respawned, messages will be prefixed with a
+		# carraige return (\r, or \C-M).
+		request=${request#$'\C-M'}
 
 		# Check for non-job commands sent to worker
-		case $job in
-			_unset_trap)
-				trap - CHLD; continue;;
-			_killjobs)
-				# Do nothing in the worker when receiving the TERM signal
-				trap '' TERM
-				# Send TERM to the entire process group (PID and all children)
-				kill -TERM -$$ &>/dev/null
-				# Reset trap
-				trap - TERM
-				continue
-				;;
+		case $request in
+			_killjobs)    killjobs; continue;;
+			_async_eval*) do_eval=1;;
 		esac
 
-		# If worker should perform unique jobs
-		if (( unique )); then
-			# Check if a previous job is still running, if yes, let it finnish
+		# Parse the request using shell parsing (z) to allow commands
+		# to be parsed from single strings and multi-args alike.
+		cmd=("${(z)request}")
+
+		# Name of the job (first argument).
+		local job=$cmd[1]
+
+		# Check if a worker should perform unique jobs, unless
+		# this is an eval since they run synchronously.
+		if (( !do_eval )) && (( unique )); then
+			# Check if a previous job is still running, if yes,
+			# skip this job and let the previous one finish.
 			for pid in ${${(v)jobstates##*:*:}%\=*}; do
 				if [[ ${storage[$job]} == $pid ]]; then
 					continue 2
@@ -103,16 +225,47 @@
 			done
 		fi
 
-		# Run task in background
-		_async_job $cmd &
-		# Store pid because zsh job manager is extremely unflexible (show jobname as non-unique '$job')...
-		storage[$job]=$!
+		# Guard against closing coproc from trap before command has started.
+		processing=1
+
+		# Because we close the coproc after the last job has completed, we must
+		# recreate it when there are no other jobs running.
+		if (( ! coproc_pid )); then
+			# Use coproc as a mutex for synchronized output between children.
+			coproc cat
+			coproc_pid="$!"
+			# Insert token into coproc
+			print -n -p "t"
+		fi
+
+		if (( do_eval )); then
+			shift cmd  # Strip _async_eval from cmd.
+			_async_eval $cmd
+		else
+			# Run job in background, completed jobs are printed to stdout.
+			_async_job $cmd &
+			# Store pid because zsh job manager is extremely unflexible (show jobname as non-unique '$job')...
+			storage[$job]="$!"
+		fi
+
+		processing=0  # Disable guard.
+
+		if (( do_eval )); then
+			do_eval=0
+
+			# When there are no active jobs we can't rely on the CHLD trap to
+			# manage the coproc lifetime.
+			close_idle_coproc
+		fi
 	done
 }
 
 #
-#  Get results from finnished jobs and pass it to the to callback function. This is the only way to reliably return the
-#  job name, return code, output and execution time and with minimal effort.
+# Get results from finished jobs and pass it to the to callback function. This is the only way to reliably return the
+# job name, return code, output and execution time and with minimal effort.
+#
+# If the async process buffer becomes corrupt, the callback will be invoked with the first argument being `[async]` (job
+# name), non-zero return code and fifth argument describing the error (stderr).
 #
 # usage:
 # 	async_process_results <worker_name> <callback_function>
@@ -123,42 +276,64 @@
 # 	$3 = resulting stdout from execution
 # 	$4 = execution time, floating point e.g. 2.05 seconds
 # 	$5 = resulting stderr from execution
+#	$6 = has next result in buffer (0 = buffer empty, 1 = yes)
 #
 async_process_results() {
-	setopt localoptions noshwordsplit
+	setopt localoptions unset noshwordsplit noksharrays noposixidentifiers noposixstrings
 
-	integer count=0
 	local worker=$1
 	local callback=$2
+	local caller=$3
 	local -a items
-	local IFS=$'\0'
+	local null=$'\0' data
+	integer -l len pos num_processed has_next
 
 	typeset -gA ASYNC_PROCESS_BUFFER
-	# Read output from zpty and parse it if available
-	while zpty -rt $worker line 2>/dev/null; do
-		# Remove unwanted \r from output
-		ASYNC_PROCESS_BUFFER[$worker]+=${line//$'\r'$'\n'/$'\n'}
-		# Split buffer on null characters, preserve empty elements
-		items=("${(@)=ASYNC_PROCESS_BUFFER[$worker]}")
-		# Remove last element since it's due to the return string separator structure
-		items=("${(@)items[1,${#items}-1]}")
-
-		# Continue until we receive all information
-		(( ${#items} % 5 )) && continue
-
-		# Work through all results
-		while (( ${#items} > 0 )); do
-			$callback "${(@)=items[1,5]}"
-			shift 5 items
-			count+=1
-		done
 
-		# Empty the buffer
-		unset "ASYNC_PROCESS_BUFFER[$worker]"
+	# Read output from zpty and parse it if available.
+	while zpty -r -t $worker data 2>/dev/null; do
+		ASYNC_PROCESS_BUFFER[$worker]+=$data
+		len=${#ASYNC_PROCESS_BUFFER[$worker]}
+		pos=${ASYNC_PROCESS_BUFFER[$worker][(i)$null]}  # Get index of NULL-character (delimiter).
+
+		# Keep going until we find a NULL-character.
+		if (( ! len )) || (( pos > len )); then
+			continue
+		fi
+
+		while (( pos <= len )); do
+			# Take the content from the beginning, until the NULL-character and
+			# perform shell parsing (z) and unquoting (Q) as an array (@).
+			items=("${(@Q)${(z)ASYNC_PROCESS_BUFFER[$worker][1,$pos-1]}}")
+
+			# Remove the extracted items from the buffer.
+			ASYNC_PROCESS_BUFFER[$worker]=${ASYNC_PROCESS_BUFFER[$worker][$pos+1,$len]}
+
+			len=${#ASYNC_PROCESS_BUFFER[$worker]}
+			if (( len > 1 )); then
+				pos=${ASYNC_PROCESS_BUFFER[$worker][(i)$null]}  # Get index of NULL-character (delimiter).
+			fi
+
+			has_next=$(( len != 0 ))
+			if (( $#items == 5 )); then
+				items+=($has_next)
+				$callback "${(@)items}"  # Send all parsed items to the callback.
+				(( num_processed++ ))
+			elif [[ -z $items ]]; then
+				# Empty items occur between results due to double-null ($'\0\0')
+				# caused by commands being both pre and suffixed with null.
+			else
+				# In case of corrupt data, invoke callback with *async* as job
+				# name, non-zero exit status and an error message on stderr.
+				$callback "[async]" 1 "" 0 "$0:$LINENO: error: bad format, got ${#items} items (${(q)items})" $has_next
+			fi
+		done
 	done
 
-	# If we processed any results, return success
-	(( count )) && return 0
+	(( num_processed )) && return 0
+
+	# Avoid printing exit value when `setopt printexitvalue` is active.`
+	[[ $caller = trap || $caller = watcher ]] && return 0
 
 	# No results were processed
 	return 1
@@ -171,11 +346,49 @@ async_process_results() {
 	local worker=$ASYNC_PTYS[$1]
 	local callback=$ASYNC_CALLBACKS[$worker]
 
+	if [[ -n $2 ]]; then
+		# from man zshzle(1):
+		# `hup' for a disconnect, `nval' for a closed or otherwise
+		# invalid descriptor, or `err' for any other condition.
+		# Systems that support only the `select' system call always use
+		# `err'.
+
+		# this has the side effect to unregister the broken file descriptor
+		async_stop_worker $worker
+
+		if [[ -n $callback ]]; then
+			$callback '[async]' 2 "" 0 "$0:$LINENO: error: fd for $worker failed: zle -F $1 returned error $2" 0
+		fi
+		return
+	fi;
+
 	if [[ -n $callback ]]; then
-		async_process_results $worker $callback
+		async_process_results $worker $callback watcher
 	fi
 }
 
+_async_send_job() {
+	setopt localoptions noshwordsplit noksharrays noposixidentifiers noposixstrings
+
+	local caller=$1
+	local worker=$2
+	shift 2
+
+	zpty -t $worker &>/dev/null || {
+		typeset -gA ASYNC_CALLBACKS
+		local callback=$ASYNC_CALLBACKS[$worker]
+
+		if [[ -n $callback ]]; then
+			$callback '[async]' 3 "" 0 "$0:$LINENO: error: no such worker: $worker" 0
+		else
+			print -u2 "$caller: no such async worker: $worker"
+		fi
+		return 1
+	}
+
+	zpty -w $worker "$@"$'\0'
+}
+
 #
 # Start a new asynchronous job on specified worker, assumes the worker is running.
 #
@@ -183,18 +396,50 @@ async_process_results() {
 # 	async_job <worker_name> <my_function> [<function_params>]
 #
 async_job() {
-	setopt localoptions noshwordsplit
+	setopt localoptions noshwordsplit noksharrays noposixidentifiers noposixstrings
 
 	local worker=$1; shift
-	zpty -w $worker $@
+
+	local -a cmd
+	cmd=("$@")
+	if (( $#cmd > 1 )); then
+		cmd=(${(q)cmd})  # Quote special characters in multi argument commands.
+	fi
+
+	_async_send_job $0 $worker "$cmd"
+}
+
+#
+# Evaluate a command (like async_job) inside the async worker, then worker environment can be manipulated. For example,
+# issuing a cd command will change the PWD of the worker which will then be inherited by all future async jobs.
+#
+# Output will be returned via callback, job name will be [async/eval].
+#
+# usage:
+# 	async_worker_eval <worker_name> <my_function> [<function_params>]
+#
+async_worker_eval() {
+	setopt localoptions noshwordsplit noksharrays noposixidentifiers noposixstrings
+
+	local worker=$1; shift
+
+	local -a cmd
+	cmd=("$@")
+	if (( $#cmd > 1 )); then
+		cmd=(${(q)cmd})  # Quote special characters in multi argument commands.
+	fi
+
+	# Quote the cmd in case RC_EXPAND_PARAM is set.
+	_async_send_job $0 $worker "_async_eval $cmd"
 }
 
 # This function traps notification signals and calls all registered callbacks
 _async_notify_trap() {
 	setopt localoptions noshwordsplit
 
+	local k
 	for k in ${(k)ASYNC_CALLBACKS}; do
-		async_process_results $k ${ASYNC_CALLBACKS[$k]}
+		async_process_results $k ${ASYNC_CALLBACKS[$k]} trap
 	done
 }
 
@@ -208,13 +453,23 @@ async_job() {
 async_register_callback() {
 	setopt localoptions noshwordsplit nolocaltraps
 
-	typeset -gA ASYNC_CALLBACKS
+	typeset -gA ASYNC_PTYS ASYNC_CALLBACKS
 	local worker=$1; shift
 
 	ASYNC_CALLBACKS[$worker]="$*"
 
-	if (( ! ASYNC_USE_ZLE_HANDLER )); then
+	# Enable trap when the ZLE watcher is unavailable, allows
+	# workers to notify (via -n) when a job is done.
+	if [[ ! -o interactive ]] || [[ ! -o zle ]]; then
 		trap '_async_notify_trap' WINCH
+	elif [[ -o interactive ]] && [[ -o zle ]]; then
+		local fd w
+		for fd w in ${(@kv)ASYNC_PTYS}; do
+			if [[ $w == $worker ]]; then
+				zle -F $fd _async_zle_watcher  # Register the ZLE handler.
+				break
+			fi
+		done
 	fi
 }
 
@@ -246,12 +501,19 @@ async_flush_jobs() {
 	zpty -t $worker &>/dev/null || return 1
 
 	# Send kill command to worker
-	zpty -w $worker "_killjobs"
-
-	# Clear all output buffers
-	while zpty -r $worker line; do true; done
+	async_job $worker "_killjobs"
+
+	# Clear the zpty buffer.
+	local junk
+	if zpty -r -t $worker junk '*'; then
+		(( ASYNC_DEBUG )) && print -n "async_flush_jobs $worker: ${(V)junk}"
+		while zpty -r -t $worker junk '*'; do
+			(( ASYNC_DEBUG )) && print -n "${(V)junk}"
+		done
+		(( ASYNC_DEBUG )) && print
+	fi
 
-	# Clear any partial buffers
+	# Finally, clear the process buffer in case of partially parsed responses.
 	typeset -gA ASYNC_PROCESS_BUFFER
 	unset "ASYNC_PROCESS_BUFFER[$worker]"
 }
@@ -269,24 +531,70 @@ async_flush_jobs() {
 # 	-p pid to notify (defaults to current pid)
 #
 async_start_worker() {
-	setopt localoptions noshwordsplit
+	setopt localoptions noshwordsplit noclobber
 
 	local worker=$1; shift
+	local -a args
+	args=("$@")
 	zpty -t $worker &>/dev/null && return
 
 	typeset -gA ASYNC_PTYS
 	typeset -h REPLY
-	zpty -b $worker _async_worker -p $$ $@ || {
+	typeset has_xtrace=0
+
+	if [[ -o interactive ]] && [[ -o zle ]]; then
+		# Inform the worker to ignore the notify flag and that we're
+		# using a ZLE watcher instead.
+		args+=(-z)
+
+		if (( ! ASYNC_ZPTY_RETURNS_FD )); then
+			# When zpty doesn't return a file descriptor (on older versions of zsh)
+			# we try to guess it anyway.
+			integer -l zptyfd
+			exec {zptyfd}>&1  # Open a new file descriptor (above 10).
+			exec {zptyfd}>&-  # Close it so it's free to be used by zpty.
+		fi
+	fi
+
+	# Workaround for stderr in the main shell sometimes (incorrectly) being
+	# reassigned to /dev/null by the reassignment done inside the async
+	# worker.
+	# See https://github.com/mafredri/zsh-async/issues/35.
+	integer errfd=-1
+	exec {errfd}>&2
+
+	# Make sure async worker is started without xtrace
+	# (the trace output interferes with the worker).
+	[[ -o xtrace ]] && {
+		has_xtrace=1
+		unsetopt xtrace
+	}
+
+	zpty -b $worker _async_worker -p $$ $args 2>&$errfd
+	local ret=$?
+
+	# Re-enable it if it was enabled, for debugging.
+	(( has_xtrace )) && setopt xtrace
+	exec {errfd}>& -
+
+	if (( ret )); then
 		async_stop_worker $worker
 		return 1
-	}
+	fi
 
-	if (( ASYNC_USE_ZLE_HANDLER )); then
-		ASYNC_PTYS[$REPLY]=$worker
-		zle -F $REPLY _async_zle_watcher
+	if ! is-at-least 5.0.8; then
+		# For ZSH versions older than 5.0.8 we delay a bit to give
+		# time for the worker to start before issuing commands,
+		# otherwise it will not be ready to receive them.
+		sleep 0.001
+	fi
+
+	if [[ -o interactive ]] && [[ -o zle ]]; then
+		if (( ! ASYNC_ZPTY_RETURNS_FD )); then
+			REPLY=$zptyfd  # Use the guessed value for the file desciptor.
+		fi
 
-		# If worker was called with -n, disable trap in favor of zle handler
-		async_job $worker _unset_trap
+		ASYNC_PTYS[$REPLY]=$worker  # Map the file desciptor to the worker.
 	fi
 }
 
@@ -299,7 +607,7 @@ async_start_worker() {
 async_stop_worker() {
 	setopt localoptions noshwordsplit
 
-	local ret=0
+	local ret=0 worker k v
 	for worker in $@; do
 		# Find and unregister the zle handler for the worker
 		for k v in ${(@kv)ASYNC_PTYS}; do
@@ -310,6 +618,10 @@ async_stop_worker() {
 		done
 		async_unregister_callback $worker
 		zpty -d $worker 2>/dev/null || ret=$?
+
+		# Clear any partial buffers.
+		typeset -gA ASYNC_PROCESS_BUFFER
+		unset "ASYNC_PROCESS_BUFFER[$worker]"
 	done
 
 	return $ret
@@ -323,17 +635,21 @@ async_stop_worker() {
 #
 async_init() {
 	(( ASYNC_INIT_DONE )) && return
-	ASYNC_INIT_DONE=1
+	typeset -g ASYNC_INIT_DONE=1
 
 	zmodload zsh/zpty
 	zmodload zsh/datetime
 
-	# Check if zsh/zpty returns a file descriptor or not, shell must also be interactive
-	ASYNC_USE_ZLE_HANDLER=0
-	[[ -o interactive ]] && {
+	# Load is-at-least for reliable version check.
+	autoload -Uz is-at-least
+
+	# Check if zsh/zpty returns a file descriptor or not,
+	# shell must also be interactive with zle enabled.
+	typeset -g ASYNC_ZPTY_RETURNS_FD=0
+	[[ -o interactive ]] && [[ -o zle ]] && {
 		typeset -h REPLY
-		zpty _async_test cat
-		(( REPLY )) && ASYNC_USE_ZLE_HANDLER=1
+		zpty _async_test :
+		(( REPLY )) && ASYNC_ZPTY_RETURNS_FD=1
 		zpty -d _async_test
 	}
 }