Skip to content
Snippets Groups Projects
Select Git revision
  • a46c22c14c1d85a34589581f5846264ff3801b73
  • master default
2 results

daemon.py

Blame
  • daemon.py 3.34 KiB
    #!/usr/bin/env python3
    import sys
    import os
    import osd
    import threading
    from collections import defaultdict
    
    import Xlib
    import Xlib.display
    import Xlib.X
    
    
    import i3ipc
    import time
    import traceback
    
    import signal
    
    from woman.shared import *
    from woman.lib import *
    import woman.util as util
    import woman.help as help
    from woman.watch import i3_watch
    from woman.main_functions import main_functions
    from woman.constants import *
    
    
    
    parser = argparse.ArgumentParser(description='I3 2D workspace manager daemon', epilog='See -H option for more information about i3-workspace.')
    parser.add_argument('-H', "--usage", action='store_true', help="Show text about how to use whole i3-workspace manager.")
    parser.add_argument('-l', "--license", action='store_true', help="Show license of this program.")
    parser.add_argument('-V', "--version", action='store_true', help="Show current version.")
    parser.add_argument('-g', "--gui", action='store_true', help="Prepare window with list of workspaces. Use `i3-workspace gui` to open it.")
    params = parser.parse_args()
    
    if params.usage:
        print(help.daemon_usage)
        exit(0)
    
    if params.license:
        print(help.LICENSE_ALL)
        exit(0)
    
    if params.version:
        print(help.VERSION)
        exit(0)
    
    USE_QT = params.gui
    
    display = Xlib.display.Display()
    root_win = display.screen().root
    
    root_win.change_attributes(event_mask=Xlib.X.PropertyChangeMask)
    
    osd.default_connection = osd.Connection(display)
    
    
    def get_queue():
        queue_atom = display.get_atom(X_QUEUE_ATOM)
        root_win.delete_property(queue_atom)
        while True:
            e = display.next_event()
            if e.type == Xlib.X.PropertyNotify:
                if(e.state == Xlib.X.PropertyNewValue
                    and e.window == root_win
                    and e.atom == queue_atom):
                    x = root_win.get_property(queue_atom, Xlib.Xatom.STRING, 0, 1, delete=1)
                    if x is None:
                        continue
                    msg = x.value
                    while x.bytes_after > 0:
                        x = root_win.get_property(queue_atom, Xlib.Xatom.STRING, len(msg) // 4, 1, delete=1)
                        if x is None:
                            break
                        msg += x.value
                    display.flush()
                    yield from map(lambda x: x[1:].split('\n>'), msg.decode().split('\n\n')[:-1])
    
    
    def x_main():
        shared.init_thread()
    
        load_workspaces()
    
        goto_workspace(1, 1)
    
        for msg in get_queue():
            print(msg)
            if len(msg) >= 1:
                try:
                    with shared.lock:
                        f = main_functions[msg[0]]
                        parser = util.ArgumentParserNoFail()
                        for (args, kvargs) in f.args:
                            parser.add_argument(*args, **kvargs)
                        params = parser.parse_args(msg[1:])
                        f.f(params)
                except WorkspaceOutOfRange:
                    osd.notify("No more workspaces", color="magenta", to='display', min_duration=0)
                except Exception as e:
                    traceback.print_exc()
    
    i3_watch_thread = threading.Thread(target=i3_watch)
    i3_watch_thread.daemon = True
    i3_watch_thread.start()
    
    if USE_QT:
        from woman.qt import qt_main
        x_thread = threading.Thread(target=x_main)
        x_thread.daemon = True
        x_thread.start()
    
        signal.signal(signal.SIGINT, signal.SIG_DFL)
        signal.signal(signal.SIGTERM, signal.SIG_DFL)
        qt_main()
    else:
        x_main()