294 lines
11 KiB
Python
Executable file
294 lines
11 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import sys
|
|
import urwid
|
|
import urwid_readline
|
|
import os
|
|
import subprocess as sp
|
|
import shutil
|
|
import shlex
|
|
import select
|
|
import platform
|
|
import re
|
|
|
|
VERSION = 'v0.1.3 (2020-09-25)'
|
|
|
|
PROMPT = 'jq> '
|
|
PAUSED_PROMPT_A = '||'
|
|
PAUSED_PROMPT_B = '> '
|
|
|
|
IS_WSL = "Microsoft" in platform.platform()
|
|
|
|
palette = [
|
|
('prompt_ok', 'light green,bold', 'default'),
|
|
('prompt_paused', 'yellow,bold', 'default'),
|
|
('prompt_err', 'light red,bold', 'default'),
|
|
('inp_plain', 'bold', 'default'),
|
|
('body_plain', '', 'default'),
|
|
('err_bar', 'light red,bold', 'default'),
|
|
]
|
|
|
|
class JqManager:
|
|
def __init__(self, inp_file, loop):
|
|
self.inp_file = inp_file
|
|
self.loop = loop
|
|
|
|
self.loop.event_loop.watch_file(self.inp_file.fileno(), self._file_avail_cb)
|
|
self.inp_data = ''
|
|
self.last_out_data = ''
|
|
self.out_data = ''
|
|
self.out_err = ''
|
|
self.scroll_line = 0
|
|
|
|
self.paused = False
|
|
self.prompt_ok = True
|
|
self.is_inp_data_done = False
|
|
self._jq_path = shutil.which('jq')
|
|
if not self._jq_path:
|
|
try:
|
|
orig_stdout.write('jq does not seem to be installed\nPerhaps you want: sudo apt install jq\n'.encode())
|
|
except BrokenPipeError:
|
|
sys.stderr.write('jq does not seem to be installed\nPerhaps you want: sudo apt install jq\n')
|
|
exit(1)
|
|
self.jq_proc = None
|
|
self.respawn_jq(None, inp.get_edit_text())
|
|
|
|
urwid.connect_signal(inp, 'change', self.respawn_jq)
|
|
|
|
def _file_avail_cb(self):
|
|
chunk = os.read(orig_stdin.fileno(), 1024).decode()
|
|
if len(chunk) != 0:
|
|
self.inp_data += chunk
|
|
try:
|
|
self.jq_proc.stdin.write(chunk.encode())
|
|
except ValueError:
|
|
# if `self.jq_proc.stdin` was closed
|
|
pass
|
|
else:
|
|
self.loop.event_loop.remove_watch_file(orig_stdin.fileno())
|
|
self.is_inp_data_done = True
|
|
self.jq_proc.stdin.close()
|
|
|
|
def toggle_pause(self):
|
|
self.paused = not self.paused
|
|
if self.out_data != '':
|
|
self.last_out_data = self.out_data
|
|
if self.prompt_ok:
|
|
self.update_body()
|
|
if self.paused:
|
|
if self.prompt_ok:
|
|
inp.set_caption([('prompt_paused', PAUSED_PROMPT_A), ('prompt_ok', PAUSED_PROMPT_B)])
|
|
else:
|
|
inp.set_caption([('prompt_paused', PAUSED_PROMPT_A), ('prompt_err', PAUSED_PROMPT_B)])
|
|
self.loop.event_loop.remove_watch_file(self.inp_file.fileno())
|
|
else:
|
|
if self.prompt_ok:
|
|
inp.set_caption(('prompt_ok', PROMPT))
|
|
else:
|
|
inp.set_caption(('prompt_err', PROMPT))
|
|
self.loop.event_loop.watch_file(self.inp_file.fileno(), self._file_avail_cb)
|
|
|
|
def _jq_out_avail_cb(self):
|
|
if self.jq_proc.stdout not in select.select([self.jq_proc.stdout], [], [], 0)[0]:
|
|
# Ignore spurius calls
|
|
return
|
|
|
|
chunk = ''
|
|
while self.jq_proc.stdout in select.select([self.jq_proc.stdout], [], [], 0)[0]:
|
|
new_chunk = os.read(self.jq_proc.stdout.fileno(), 1024).decode()
|
|
if len(new_chunk) == 0:
|
|
break
|
|
chunk += new_chunk
|
|
|
|
if len(chunk) != 0:
|
|
self.out_data += chunk
|
|
if not self.paused:
|
|
self.update_body()
|
|
else:
|
|
if self.out_err == '':
|
|
if loop.screen_size is not None:
|
|
new_scroll_line = min(max(len(self.out_data.split('\n')) - int(loop.screen_size[1] / 2), 0), self.scroll_line)
|
|
if new_scroll_line != self.scroll_line:
|
|
self.scroll_line = new_scroll_line
|
|
self.update_body()
|
|
self.loop.event_loop.remove_watch_file(self.jq_proc.stdout.fileno())
|
|
self.jq_proc.stdout.close()
|
|
self.jq_proc.stdin.close()
|
|
self.jq_proc.wait()
|
|
|
|
def _jq_err_avail_cb(self):
|
|
if self.jq_proc.stderr not in select.select([self.jq_proc.stderr], [], [], 0)[0]:
|
|
# Ignore spurius calls
|
|
return
|
|
|
|
chunk = ''
|
|
while self.jq_proc.stderr in select.select([self.jq_proc.stderr], [], [], 0)[0]:
|
|
new_chunk = os.read(self.jq_proc.stderr.fileno(), 1024).decode()
|
|
if len(new_chunk) == 0:
|
|
break
|
|
chunk += new_chunk
|
|
|
|
if len(chunk) != 0:
|
|
self.out_err += chunk
|
|
err_bar.set_text(self.out_err.replace(' (Unix shell quoting issues?)', '').strip())
|
|
else:
|
|
self.loop.event_loop.remove_watch_file(self.jq_proc.stderr.fileno())
|
|
self.jq_proc.stderr.close()
|
|
|
|
if self.out_err != '':
|
|
self.prompt_ok = False
|
|
|
|
if self.paused:
|
|
inp.set_caption([('prompt_paused', PAUSED_PROMPT_A), ('prompt_err', PAUSED_PROMPT_B)])
|
|
else:
|
|
inp.set_caption(('prompt_err', PROMPT))
|
|
elif self.out_data == '':
|
|
self.last_out_data = ''
|
|
self.update_body()
|
|
|
|
def respawn_jq(self, _, query):
|
|
if self.jq_proc is not None:
|
|
if not self.jq_proc.stdout.closed:
|
|
self.loop.event_loop.remove_watch_file(self.jq_proc.stdout.fileno())
|
|
if not self.jq_proc.stderr.closed:
|
|
self.loop.event_loop.remove_watch_file(self.jq_proc.stderr.fileno())
|
|
self.jq_proc.stdin.close()
|
|
self.jq_proc.stdout.close()
|
|
self.jq_proc.stderr.close()
|
|
self.jq_proc.terminate()
|
|
self.jq_proc.wait()
|
|
err_bar.set_text('')
|
|
if self.out_data != '' and not self.paused:
|
|
self.last_out_data = self.out_data
|
|
self.out_data = ''
|
|
self.out_err = ''
|
|
self.prompt_ok = True
|
|
if self.paused:
|
|
inp.set_caption([('prompt_paused', PAUSED_PROMPT_A), ('prompt_ok', PAUSED_PROMPT_B)])
|
|
else:
|
|
inp.set_caption(('prompt_ok', PROMPT))
|
|
|
|
self.jq_proc = sp.Popen([self._jq_path, query], stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, bufsize=0)
|
|
try:
|
|
encoded_inp_data = self.inp_data.encode()
|
|
try:
|
|
for offset in range(0, len(encoded_inp_data), 1024):
|
|
self.jq_proc.stdin.write(encoded_inp_data[offset:offset+1024])
|
|
self._jq_out_avail_cb()
|
|
self._jq_err_avail_cb()
|
|
if self.is_inp_data_done:
|
|
self.jq_proc.stdin.close()
|
|
self.loop.event_loop.watch_file(self.jq_proc.stdout.fileno(), self._jq_out_avail_cb)
|
|
self.loop.event_loop.watch_file(self.jq_proc.stderr.fileno(), self._jq_err_avail_cb)
|
|
except ValueError:
|
|
pass
|
|
except BrokenPipeError:
|
|
pass
|
|
|
|
def update_body(self):
|
|
height = 256
|
|
if self.loop.screen_size is not None:
|
|
height = self.loop.screen_size[1]
|
|
|
|
if self.out_data and not self.paused:
|
|
body.set_text('\n'.join(self.out_data.split('\n')[self.scroll_line:][:height]))
|
|
else:
|
|
body.set_text('\n'.join(self.last_out_data.split('\n')[self.scroll_line:][:height]))
|
|
|
|
class BetterEdit(urwid_readline.ReadlineEdit):
|
|
def keypress(self, size, key):
|
|
if key == 'ctrl left':
|
|
try:
|
|
self.edit_pos = self.edit_text[:self.edit_pos].rindex(' ')
|
|
except ValueError:
|
|
self.edit_pos = 0
|
|
elif key == 'ctrl right':
|
|
try:
|
|
self.edit_pos += self.edit_text[self.edit_pos:].index(' ') + 1
|
|
except ValueError:
|
|
self.edit_pos = len(self.edit_text)
|
|
elif key == 'ctrl p':
|
|
jq_man.toggle_pause()
|
|
elif key in ('up', 'down', 'page up', 'page down'):
|
|
if key == 'up':
|
|
jq_man.scroll_line = max(0, jq_man.scroll_line - 1)
|
|
elif key == 'down':
|
|
jq_man.scroll_line = min(max(len(jq_man.out_data.split('\n')) - int(loop.screen_size[1] / 2), 0), jq_man.scroll_line + 1)
|
|
elif key == 'page up':
|
|
jq_man.scroll_line = max(0, jq_man.scroll_line - int(loop.screen_size[1] / 2))
|
|
elif key == 'page down':
|
|
jq_man.scroll_line = min(max(len(jq_man.out_data.split('\n')) - int(loop.screen_size[1] / 2), 0), jq_man.scroll_line + int(loop.screen_size[1] / 2))
|
|
jq_man.update_body()
|
|
else:
|
|
return super().keypress(size, key)
|
|
|
|
|
|
class WSLScreen(urwid.raw_display.Screen):
|
|
"""
|
|
This class is used to fix issue #6, where urwid has artifacts under WSL
|
|
"""
|
|
def write(self, data):
|
|
# replace urwid's SI/SO, which produce artifacts under WSL.
|
|
# at some point we may figure out what they actually do.
|
|
data = re.sub("[\x0e\x0f]", "", data)
|
|
super().write(data)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if sys.stdin.isatty():
|
|
sys.stderr.write('error: jqed requires some data piped on standard input, for example try: `ip --json link | jqed`\n')
|
|
exit(1)
|
|
|
|
if len(sys.argv) > 2:
|
|
sys.stderr.write('usage: jqed [initial expression]\n')
|
|
exit(1)
|
|
|
|
# Preserve original stdio, and replace stdio with /dev/tty
|
|
orig_stdin = os.fdopen(os.dup(sys.stdin.fileno()))
|
|
orig_stdout = os.fdopen(os.dup(sys.stdout.fileno()), mode='wb', buffering=0)
|
|
|
|
os.close(0)
|
|
os.close(1)
|
|
sys.stdin = open('/dev/tty', 'rb')
|
|
sys.stdout = open('/dev/tty', 'wb')
|
|
|
|
# Apparently urwid has some artifacts with WSL, see issue #6
|
|
# Hopefully this won't break WSL2
|
|
if IS_WSL:
|
|
urwid_screen = WSLScreen()
|
|
else:
|
|
urwid_screen = urwid.raw_display.Screen()
|
|
|
|
|
|
# Create gui
|
|
inp = BetterEdit(('prompt_ok', PROMPT))
|
|
if len(sys.argv) == 2:
|
|
# If the user specified an argument, use it as an initial expression
|
|
inp.set_edit_text(sys.argv[1])
|
|
inp.set_edit_pos(len(sys.argv[1]))
|
|
body = urwid.Text('')
|
|
body_filler = urwid.AttrMap(urwid.Filler(body, 'top'), 'body_plain')
|
|
err_bar = urwid.Text(('inp_plain', 'HELP: ^C: Exit, ^P: Pause, jq manual: https://stedolan.github.io/jq/manual'))
|
|
|
|
frame = urwid.Frame(
|
|
body_filler,
|
|
header=urwid.AttrMap(inp, 'inp_plain'),
|
|
footer=urwid.AttrMap(err_bar, 'err_bar'),
|
|
focus_part='header'
|
|
)
|
|
loop = urwid.MainLoop(frame, palette, handle_mouse=False, screen=urwid_screen)
|
|
try:
|
|
jq_man = JqManager(orig_stdin, loop)
|
|
loop.run()
|
|
except KeyboardInterrupt:
|
|
line = shlex.quote(inp.edit_text.strip())
|
|
if line.startswith("''"):
|
|
line = line[2:]
|
|
if line.endswith("''"):
|
|
line = line[:-2]
|
|
try:
|
|
orig_stdout.write(
|
|
('{}\njqed: jq editor ' + VERSION + ' https://github.com/wazzaps/jqed\n' +
|
|
'jqed: | jq {}\n').format(jq_man.out_data, line).encode())
|
|
except BrokenPipeError:
|
|
sys.stderr.write('jq {}\n'.format(line))
|
|
exit(0)
|