Select Git revision
-
Martin Mareš authoredMartin Mareš authored
daemon.py 3.34 KiB
#!/usr/bin/env python3
import sys
import os
import osd
import threading
from collections import defaultdict
import Xlib
import Xlib.display
import Xlib.X
import i3ipc
import time
import traceback
import signal
from woman.shared import *
from woman.lib import *
import woman.util as util
import woman.help as help
from woman.watch import i3_watch
from woman.main_functions import main_functions
from woman.constants import *
parser = argparse.ArgumentParser(description='I3 2D workspace manager daemon', epilog='See -H option for more information about i3-workspace.')
parser.add_argument('-H', "--usage", action='store_true', help="Show text about how to use whole i3-workspace manager.")
parser.add_argument('-l', "--license", action='store_true', help="Show license of this program.")
parser.add_argument('-V', "--version", action='store_true', help="Show current version.")
parser.add_argument('-g', "--gui", action='store_true', help="Prepare window with list of workspaces. Use `i3-workspace gui` to open it.")
params = parser.parse_args()
if params.usage:
print(help.daemon_usage)
exit(0)
if params.license:
print(help.LICENSE_ALL)
exit(0)
if params.version:
print(help.VERSION)
exit(0)
USE_QT = params.gui
display = Xlib.display.Display()
root_win = display.screen().root
root_win.change_attributes(event_mask=Xlib.X.PropertyChangeMask)
osd.default_connection = osd.Connection(display)
def get_queue():
queue_atom = display.get_atom(X_QUEUE_ATOM)
root_win.delete_property(queue_atom)
while True:
e = display.next_event()
if e.type == Xlib.X.PropertyNotify:
if(e.state == Xlib.X.PropertyNewValue
and e.window == root_win
and e.atom == queue_atom):
x = root_win.get_property(queue_atom, Xlib.Xatom.STRING, 0, 1, delete=1)
if x is None:
continue
msg = x.value
while x.bytes_after > 0:
x = root_win.get_property(queue_atom, Xlib.Xatom.STRING, len(msg) // 4, 1, delete=1)
if x is None:
break
msg += x.value
display.flush()
yield from map(lambda x: x[1:].split('\n>'), msg.decode().split('\n\n')[:-1])
def x_main():
shared.init_thread()
load_workspaces()
goto_workspace(1, 1)
for msg in get_queue():
print(msg)
if len(msg) >= 1:
try:
with shared.lock:
f = main_functions[msg[0]]
parser = util.ArgumentParserNoFail()
for (args, kvargs) in f.args:
parser.add_argument(*args, **kvargs)
params = parser.parse_args(msg[1:])
f.f(params)
except WorkspaceOutOfRange:
osd.notify("No more workspaces", color="magenta", to='display', min_duration=0)
except Exception as e:
traceback.print_exc()
i3_watch_thread = threading.Thread(target=i3_watch)
i3_watch_thread.daemon = True
i3_watch_thread.start()
if USE_QT:
from woman.qt import qt_main
x_thread = threading.Thread(target=x_main)
x_thread.daemon = True
x_thread.start()
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
qt_main()
else:
x_main()