comparison .zfun/async @ 381:e84b6da69ea0

async.zsh: import Imported from https://raw.githubusercontent.com/mafredri/zsh-async/master/async.zsh at git revision 032703d.
author Augie Fackler <raf@durin42.com>
date Thu, 10 Mar 2016 18:58:05 -0500
parents
children d3d52766cfcd
comparison
equal deleted inserted replaced
380:0ceb8554801e 381:e84b6da69ea0
1 #!/usr/bin/env zsh
2
3 #
4 # zsh-async
5 #
6 # version: 1.1.0
7 # author: Mathias Fredriksson
8 # url: https://github.com/mafredri/zsh-async
9 #
10
11 # Wrapper for jobs executed by the async worker, gives output in parseable format with execution time
12 _async_job() {
13 # Store start time as double precision (+E disables scientific notation)
14 float -F duration=$EPOCHREALTIME
15
16 # Run the command
17 #
18 # What is happening here is that we are assigning stdout, stderr and ret to
19 # variables, and then we are printing out the variable assignment through
20 # typeset -p. This way when we run eval we get something along the lines of:
21 # eval "
22 # typeset stdout=' M async.test.sh\n M async.zsh'
23 # typeset ret=0
24 # typeset stderr=''
25 # "
26 unset stdout stderr ret
27 eval "$(
28 {
29 stdout=$(eval "$@")
30 ret=$?
31 typeset -p stdout ret
32 } 2> >(stderr=$(cat); typeset -p stderr)
33 )"
34
35 # Calculate duration
36 duration=$(( EPOCHREALTIME - duration ))
37
38 # stip all null-characters from stdout and stderr
39 stdout=${stdout//$'\0'/}
40 stderr=${stderr//$'\0'/}
41
42 # if ret is missing for some unknown reason, set it to -1 to indicate we
43 # have run into a bug
44 ret=${ret:--1}
45
46 # Grab mutex lock
47 read -ep >/dev/null
48
49 # return output (<job_name> <return_code> <stdout> <duration> <stderr>)
50 print -r -N -n -- "$1" "$ret" "$stdout" "$duration" "$stderr"$'\0'
51
52 # Unlock mutex
53 print -p "t"
54 }
55
56 # The background worker manages all tasks and runs them without interfering with other processes
57 _async_worker() {
58 local -A storage
59 local unique=0
60
61 # Process option parameters passed to worker
62 while getopts "np:u" opt; do
63 case $opt in
64 # Use SIGWINCH since many others seem to cause zsh to freeze, e.g. ALRM, INFO, etc.
65 n) trap 'kill -WINCH $ASYNC_WORKER_PARENT_PID' CHLD;;
66 p) ASYNC_WORKER_PARENT_PID=$OPTARG;;
67 u) unique=1;;
68 esac
69 done
70
71 # Create a mutex for writing to the terminal through coproc
72 coproc cat
73 # Insert token into coproc
74 print -p "t"
75
76 while read -r cmd; do
77 # Separate on spaces into an array
78 cmd=(${=cmd})
79 local job=$cmd[1]
80
81 # Check for non-job commands sent to worker
82 case $job in
83 _unset_trap)
84 trap - CHLD; continue;;
85 _killjobs)
86 # Do nothing in the worker when receiving the TERM signal
87 trap '' TERM
88 # Send TERM to the entire process group (PID and all children)
89 kill -TERM -$$ &>/dev/null
90 # Reset trap
91 trap - TERM
92 continue
93 ;;
94 esac
95
96 # If worker should perform unique jobs
97 if (( unique )); then
98 # Check if a previous job is still running, if yes, let it finnish
99 for pid in ${${(v)jobstates##*:*:}%\=*}; do
100 if [[ ${storage[$job]} == $pid ]]; then
101 continue 2
102 fi
103 done
104 fi
105
106 # Run task in background
107 _async_job $cmd &
108 # Store pid because zsh job manager is extremely unflexible (show jobname as non-unique '$job')...
109 storage[$job]=$!
110 done
111 }
112
113 #
114 # Get results from finnished jobs and pass it to the to callback function. This is the only way to reliably return the
115 # job name, return code, output and execution time and with minimal effort.
116 #
117 # usage:
118 # async_process_results <worker_name> <callback_function>
119 #
120 # callback_function is called with the following parameters:
121 # $1 = job name, e.g. the function passed to async_job
122 # $2 = return code
123 # $3 = resulting stdout from execution
124 # $4 = execution time, floating point e.g. 2.05 seconds
125 # $5 = resulting stderr from execution
126 #
127 async_process_results() {
128 setopt localoptions noshwordsplit
129
130 integer count=0
131 local worker=$1
132 local callback=$2
133 local -a items
134 local IFS=$'\0'
135
136 typeset -gA ASYNC_PROCESS_BUFFER
137 # Read output from zpty and parse it if available
138 while zpty -rt $worker line 2>/dev/null; do
139 # Remove unwanted \r from output
140 ASYNC_PROCESS_BUFFER[$worker]+=${line//$'\r'$'\n'/$'\n'}
141 # Split buffer on null characters, preserve empty elements
142 items=("${(@)=ASYNC_PROCESS_BUFFER[$worker]}")
143 # Remove last element since it's due to the return string separator structure
144 items=("${(@)items[1,${#items}-1]}")
145
146 # Continue until we receive all information
147 (( ${#items} % 5 )) && continue
148
149 # Work through all results
150 while (( ${#items} > 0 )); do
151 $callback "${(@)=items[1,5]}"
152 shift 5 items
153 count+=1
154 done
155
156 # Empty the buffer
157 unset "ASYNC_PROCESS_BUFFER[$worker]"
158 done
159
160 # If we processed any results, return success
161 (( count )) && return 0
162
163 # No results were processed
164 return 1
165 }
166
167 # Watch worker for output
168 _async_zle_watcher() {
169 setopt localoptions noshwordsplit
170 typeset -gA ASYNC_PTYS ASYNC_CALLBACKS
171 local worker=$ASYNC_PTYS[$1]
172 local callback=$ASYNC_CALLBACKS[$worker]
173
174 if [[ -n $callback ]]; then
175 async_process_results $worker $callback
176 fi
177 }
178
179 #
180 # Start a new asynchronous job on specified worker, assumes the worker is running.
181 #
182 # usage:
183 # async_job <worker_name> <my_function> [<function_params>]
184 #
185 async_job() {
186 setopt localoptions noshwordsplit
187
188 local worker=$1; shift
189 zpty -w $worker $@
190 }
191
192 # This function traps notification signals and calls all registered callbacks
193 _async_notify_trap() {
194 setopt localoptions noshwordsplit
195
196 for k in ${(k)ASYNC_CALLBACKS}; do
197 async_process_results $k ${ASYNC_CALLBACKS[$k]}
198 done
199 }
200
201 #
202 # Register a callback for completed jobs. As soon as a job is finnished, async_process_results will be called with the
203 # specified callback function. This requires that a worker is initialized with the -n (notify) option.
204 #
205 # usage:
206 # async_register_callback <worker_name> <callback_function>
207 #
208 async_register_callback() {
209 setopt localoptions noshwordsplit nolocaltraps
210
211 typeset -gA ASYNC_CALLBACKS
212 local worker=$1; shift
213
214 ASYNC_CALLBACKS[$worker]="$*"
215
216 if (( ! ASYNC_USE_ZLE_HANDLER )); then
217 trap '_async_notify_trap' WINCH
218 fi
219 }
220
221 #
222 # Unregister the callback for a specific worker.
223 #
224 # usage:
225 # async_unregister_callback <worker_name>
226 #
227 async_unregister_callback() {
228 typeset -gA ASYNC_CALLBACKS
229
230 unset "ASYNC_CALLBACKS[$1]"
231 }
232
233 #
234 # Flush all current jobs running on a worker. This will terminate any and all running processes under the worker, use
235 # with caution.
236 #
237 # usage:
238 # async_flush_jobs <worker_name>
239 #
240 async_flush_jobs() {
241 setopt localoptions noshwordsplit
242
243 local worker=$1; shift
244
245 # Check if the worker exists
246 zpty -t $worker &>/dev/null || return 1
247
248 # Send kill command to worker
249 zpty -w $worker "_killjobs"
250
251 # Clear all output buffers
252 while zpty -r $worker line; do true; done
253
254 # Clear any partial buffers
255 typeset -gA ASYNC_PROCESS_BUFFER
256 unset "ASYNC_PROCESS_BUFFER[$worker]"
257 }
258
259 #
260 # Start a new async worker with optional parameters, a worker can be told to only run unique tasks and to notify a
261 # process when tasks are complete.
262 #
263 # usage:
264 # async_start_worker <worker_name> [-u] [-n] [-p <pid>]
265 #
266 # opts:
267 # -u unique (only unique job names can run)
268 # -n notify through SIGWINCH signal
269 # -p pid to notify (defaults to current pid)
270 #
271 async_start_worker() {
272 setopt localoptions noshwordsplit
273
274 local worker=$1; shift
275 zpty -t $worker &>/dev/null && return
276
277 typeset -gA ASYNC_PTYS
278 typeset -h REPLY
279 zpty -b $worker _async_worker -p $$ $@ || {
280 async_stop_worker $worker
281 return 1
282 }
283
284 if (( ASYNC_USE_ZLE_HANDLER )); then
285 ASYNC_PTYS[$REPLY]=$worker
286 zle -F $REPLY _async_zle_watcher
287
288 # If worker was called with -n, disable trap in favor of zle handler
289 async_job $worker _unset_trap
290 fi
291 }
292
293 #
294 # Stop one or multiple workers that are running, all unfetched and incomplete work will be lost.
295 #
296 # usage:
297 # async_stop_worker <worker_name_1> [<worker_name_2>]
298 #
299 async_stop_worker() {
300 setopt localoptions noshwordsplit
301
302 local ret=0
303 for worker in $@; do
304 # Find and unregister the zle handler for the worker
305 for k v in ${(@kv)ASYNC_PTYS}; do
306 if [[ $v == $worker ]]; then
307 zle -F $k
308 unset "ASYNC_PTYS[$k]"
309 fi
310 done
311 async_unregister_callback $worker
312 zpty -d $worker 2>/dev/null || ret=$?
313 done
314
315 return $ret
316 }
317
318 #
319 # Initialize the required modules for zsh-async. To be called before using the zsh-async library.
320 #
321 # usage:
322 # async_init
323 #
324 async_init() {
325 (( ASYNC_INIT_DONE )) && return
326 ASYNC_INIT_DONE=1
327
328 zmodload zsh/zpty
329 zmodload zsh/datetime
330
331 # Check if zsh/zpty returns a file descriptor or not, shell must also be interactive
332 ASYNC_USE_ZLE_HANDLER=0
333 [[ -o interactive ]] && {
334 typeset -h REPLY
335 zpty _async_test cat
336 (( REPLY )) && ASYNC_USE_ZLE_HANDLER=1
337 zpty -d _async_test
338 }
339 }
340
341 async() {
342 async_init
343 }
344
345 async "$@"