#!/usr/bin/env python3 import sys import urwid import urwid_readline import os import subprocess as sp import shutil import shlex import select import platform import re VERSION = 'v0.1.3 (2020-09-25)' PROMPT = 'jq> ' PAUSED_PROMPT_A = '||' PAUSED_PROMPT_B = '> ' IS_WSL = "Microsoft" in platform.platform() palette = [ ('prompt_ok', 'light green,bold', 'default'), ('prompt_paused', 'yellow,bold', 'default'), ('prompt_err', 'light red,bold', 'default'), ('inp_plain', 'bold', 'default'), ('body_plain', '', 'default'), ('err_bar', 'light red,bold', 'default'), ] class JqManager: def __init__(self, inp_file, loop): self.inp_file = inp_file self.loop = loop self.loop.event_loop.watch_file(self.inp_file.fileno(), self._file_avail_cb) self.inp_data = '' self.last_out_data = '' self.out_data = '' self.out_err = '' self.scroll_line = 0 self.paused = False self.prompt_ok = True self.is_inp_data_done = False self._jq_path = shutil.which('jq') if not self._jq_path: try: orig_stdout.write('jq does not seem to be installed\nPerhaps you want: sudo apt install jq\n'.encode()) except BrokenPipeError: sys.stderr.write('jq does not seem to be installed\nPerhaps you want: sudo apt install jq\n') exit(1) self.jq_proc = None self.respawn_jq(None, inp.get_edit_text()) urwid.connect_signal(inp, 'change', self.respawn_jq) def _file_avail_cb(self): chunk = os.read(orig_stdin.fileno(), 1024).decode() if len(chunk) != 0: self.inp_data += chunk try: self.jq_proc.stdin.write(chunk.encode()) except ValueError: # if `self.jq_proc.stdin` was closed pass else: self.loop.event_loop.remove_watch_file(orig_stdin.fileno()) self.is_inp_data_done = True self.jq_proc.stdin.close() def toggle_pause(self): self.paused = not self.paused if self.out_data != '': self.last_out_data = self.out_data if self.prompt_ok: self.update_body() if self.paused: if self.prompt_ok: inp.set_caption([('prompt_paused', PAUSED_PROMPT_A), ('prompt_ok', PAUSED_PROMPT_B)]) else: inp.set_caption([('prompt_paused', PAUSED_PROMPT_A), ('prompt_err', PAUSED_PROMPT_B)]) self.loop.event_loop.remove_watch_file(self.inp_file.fileno()) else: if self.prompt_ok: inp.set_caption(('prompt_ok', PROMPT)) else: inp.set_caption(('prompt_err', PROMPT)) self.loop.event_loop.watch_file(self.inp_file.fileno(), self._file_avail_cb) def _jq_out_avail_cb(self): if self.jq_proc.stdout not in select.select([self.jq_proc.stdout], [], [], 0)[0]: # Ignore spurius calls return chunk = '' while self.jq_proc.stdout in select.select([self.jq_proc.stdout], [], [], 0)[0]: new_chunk = os.read(self.jq_proc.stdout.fileno(), 1024).decode() if len(new_chunk) == 0: break chunk += new_chunk if len(chunk) != 0: self.out_data += chunk if not self.paused: self.update_body() else: if self.out_err == '': if loop.screen_size is not None: new_scroll_line = min(max(len(self.out_data.split('\n')) - int(loop.screen_size[1] / 2), 0), self.scroll_line) if new_scroll_line != self.scroll_line: self.scroll_line = new_scroll_line self.update_body() self.loop.event_loop.remove_watch_file(self.jq_proc.stdout.fileno()) self.jq_proc.stdout.close() self.jq_proc.stdin.close() self.jq_proc.wait() def _jq_err_avail_cb(self): if self.jq_proc.stderr not in select.select([self.jq_proc.stderr], [], [], 0)[0]: # Ignore spurius calls return chunk = '' while self.jq_proc.stderr in select.select([self.jq_proc.stderr], [], [], 0)[0]: new_chunk = os.read(self.jq_proc.stderr.fileno(), 1024).decode() if len(new_chunk) == 0: break chunk += new_chunk if len(chunk) != 0: self.out_err += chunk err_bar.set_text(self.out_err.replace(' (Unix shell quoting issues?)', '').strip()) else: self.loop.event_loop.remove_watch_file(self.jq_proc.stderr.fileno()) self.jq_proc.stderr.close() if self.out_err != '': self.prompt_ok = False if self.paused: inp.set_caption([('prompt_paused', PAUSED_PROMPT_A), ('prompt_err', PAUSED_PROMPT_B)]) else: inp.set_caption(('prompt_err', PROMPT)) elif self.out_data == '': self.last_out_data = '' self.update_body() def respawn_jq(self, _, query): if self.jq_proc is not None: if not self.jq_proc.stdout.closed: self.loop.event_loop.remove_watch_file(self.jq_proc.stdout.fileno()) if not self.jq_proc.stderr.closed: self.loop.event_loop.remove_watch_file(self.jq_proc.stderr.fileno()) self.jq_proc.stdin.close() self.jq_proc.stdout.close() self.jq_proc.stderr.close() self.jq_proc.terminate() self.jq_proc.wait() err_bar.set_text('') if self.out_data != '' and not self.paused: self.last_out_data = self.out_data self.out_data = '' self.out_err = '' self.prompt_ok = True if self.paused: inp.set_caption([('prompt_paused', PAUSED_PROMPT_A), ('prompt_ok', PAUSED_PROMPT_B)]) else: inp.set_caption(('prompt_ok', PROMPT)) self.jq_proc = sp.Popen([self._jq_path, query], stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, bufsize=0) try: encoded_inp_data = self.inp_data.encode() try: for offset in range(0, len(encoded_inp_data), 1024): self.jq_proc.stdin.write(encoded_inp_data[offset:offset+1024]) self._jq_out_avail_cb() self._jq_err_avail_cb() if self.is_inp_data_done: self.jq_proc.stdin.close() self.loop.event_loop.watch_file(self.jq_proc.stdout.fileno(), self._jq_out_avail_cb) self.loop.event_loop.watch_file(self.jq_proc.stderr.fileno(), self._jq_err_avail_cb) except ValueError: pass except BrokenPipeError: pass def update_body(self): height = 256 if self.loop.screen_size is not None: height = self.loop.screen_size[1] if self.out_data and not self.paused: body.set_text('\n'.join(self.out_data.split('\n')[self.scroll_line:][:height])) else: body.set_text('\n'.join(self.last_out_data.split('\n')[self.scroll_line:][:height])) class BetterEdit(urwid_readline.ReadlineEdit): def keypress(self, size, key): if key == 'ctrl left': try: self.edit_pos = self.edit_text[:self.edit_pos].rindex(' ') except ValueError: self.edit_pos = 0 elif key == 'ctrl right': try: self.edit_pos += self.edit_text[self.edit_pos:].index(' ') + 1 except ValueError: self.edit_pos = len(self.edit_text) elif key == 'ctrl p': jq_man.toggle_pause() elif key in ('up', 'down', 'page up', 'page down'): if key == 'up': jq_man.scroll_line = max(0, jq_man.scroll_line - 1) elif key == 'down': jq_man.scroll_line = min(max(len(jq_man.out_data.split('\n')) - int(loop.screen_size[1] / 2), 0), jq_man.scroll_line + 1) elif key == 'page up': jq_man.scroll_line = max(0, jq_man.scroll_line - int(loop.screen_size[1] / 2)) elif key == 'page down': jq_man.scroll_line = min(max(len(jq_man.out_data.split('\n')) - int(loop.screen_size[1] / 2), 0), jq_man.scroll_line + int(loop.screen_size[1] / 2)) jq_man.update_body() else: return super().keypress(size, key) class WSLScreen(urwid.raw_display.Screen): """ This class is used to fix issue #6, where urwid has artifacts under WSL """ def write(self, data): # replace urwid's SI/SO, which produce artifacts under WSL. # at some point we may figure out what they actually do. data = re.sub("[\x0e\x0f]", "", data) super().write(data) if __name__ == '__main__': if sys.stdin.isatty(): sys.stderr.write('error: jqed requires some data piped on standard input, for example try: `ip --json link | jqed`\n') exit(1) if len(sys.argv) > 2: sys.stderr.write('usage: jqed [initial expression]\n') exit(1) # Preserve original stdio, and replace stdio with /dev/tty orig_stdin = os.fdopen(os.dup(sys.stdin.fileno())) orig_stdout = os.fdopen(os.dup(sys.stdout.fileno()), mode='wb', buffering=0) os.close(0) os.close(1) sys.stdin = open('/dev/tty', 'rb') sys.stdout = open('/dev/tty', 'wb') # Apparently urwid has some artifacts with WSL, see issue #6 # Hopefully this won't break WSL2 if IS_WSL: urwid_screen = WSLScreen() else: urwid_screen = urwid.raw_display.Screen() # Create gui inp = BetterEdit(('prompt_ok', PROMPT)) if len(sys.argv) == 2: # If the user specified an argument, use it as an initial expression inp.set_edit_text(sys.argv[1]) inp.set_edit_pos(len(sys.argv[1])) body = urwid.Text('') body_filler = urwid.AttrMap(urwid.Filler(body, 'top'), 'body_plain') err_bar = urwid.Text(('inp_plain', 'HELP: ^C: Exit, ^P: Pause, jq manual: https://stedolan.github.io/jq/manual')) frame = urwid.Frame( body_filler, header=urwid.AttrMap(inp, 'inp_plain'), footer=urwid.AttrMap(err_bar, 'err_bar'), focus_part='header' ) loop = urwid.MainLoop(frame, palette, handle_mouse=False, screen=urwid_screen) try: jq_man = JqManager(orig_stdin, loop) loop.run() except KeyboardInterrupt: line = shlex.quote(inp.edit_text.strip()) if line.startswith("''"): line = line[2:] if line.endswith("''"): line = line[:-2] try: orig_stdout.write( ('{}\njqed: jq editor ' + VERSION + ' https://github.com/wazzaps/jqed\n' + 'jqed: | jq {}\n').format(jq_man.out_data, line).encode()) except BrokenPipeError: sys.stderr.write('jq {}\n'.format(line)) exit(0)