Skip to content
Snippets Groups Projects
Commit c2815721 authored by Jiří Kalvoda's avatar Jiří Kalvoda
Browse files

INIT

parents
No related branches found
No related tags found
No related merge requests found
vm.py 0 → 100755
#!/bin/python3
from subprocess import run, PIPE
import subprocess
import sys, os, pathlib
import argparse
import time
import json
from dataclasses import dataclass
import functools
from typing import Optional
socket_path = '.socket'
is_daemon = False
if __name__ == "__main__":
if len(sys.argv)==2 and sys.argv[1]=="server":
is_daemon = True
class S():
'''
Class for nice formated long text area.
Use S-"""
Text
"""
It will remove all tailing and leading empty lines.
Then it will remove as many posiible leading spaces
from each lines (from each the same number of spaces).
'''
def __sub__(_, a):
lines = a.split("\n")
while len(lines) and lines[0].strip() == "":
lines.pop(0)
while len(lines) and lines[-1].strip() == "":
lines.pop(-1)
def space_count(s):
r = 0
while r < len(s) and s[r]==' ':
r += 1
return r
to_remove = min(space_count(l) for l in lines if l.strip() != "")
return "\n".join("" if len(l) < to_remove else l[to_remove:] for l in lines)
S=S()
force=False
no_daemon=False
def r(*arg):
print(">", " ".join(arg))
run(arg, check=not force)
def nft(rules):
print("\n".join("@ "+i for i in rules.split("\n")))
#p = subprocess.Popen(["nft", "-i"], stdin=PIPE, encoding='utf-8')
run(["nft", rules], check=not force)
#p.communicate(input=rules)
#p.wait()
#if p.returncode: raise RuntimeError("Wrong returnoce")
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(help="commands", dest="subcommand")
subcommands = {}
parser.add_argument("-f", "--force", action='store_true')
parser.add_argument("-r", "--root_folder", type=str)
def get_spec(f):
import inspect
if 'spec' not in f.__dict__:
f.spec = inspect.getfullargspec(f)
return f.spec
def cmd(f):
import inspect
spec = get_spec(f)
subcommands[f.__name__] = f
f.parser = subparsers.add_parser(f.__name__)
for i, arg in enumerate(spec.args):
annotation = spec.annotations.get(arg, None)
has_default = spec.defaults is not None and i >= len(spec.args) - len(spec.defaults)
if annotation in [str, int, float]:
f.parser.add_argument(
("--" if has_default else "")+arg,
type=annotation,
)
if annotation in [bool]:
f.parser.add_argument(
"--"+arg,
action="store_true",
)
if spec.varargs is not None:
arg = spec.varargs
annotation = spec.annotations.get(arg, None)
if annotation == tuple[str, ...]:
f.parser.add_argument(
arg,
type=str, nargs=argparse.REMAINDER,
)
return f
def random_passwd():
return "".join(chr(ord('0') + i%10) for i in os.urandom(50))
@dataclass
class Ucred:
pid: int
uid: int
gid: int
def my_ucred():
return Ucred(os.getpid(), os.getuid(), os.getgid())
def recvall(sock):
BUFF_SIZE = 4096
data = bytearray()
while True:
packet = sock.recv(BUFF_SIZE)
if len(packet) == 0:
break
data.extend(packet)
return data
daemon_funcs = {}
def ask_server(in_struct):
import socket
connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
connection.connect(root_folder+socket_path)
in_data = json.dumps(in_struct).encode('utf-8')
connection.sendall(in_data)
connection.shutdown(socket.SHUT_WR)
out_data = recvall(connection)
out_struct = json.loads(out_data)
return out_struct
def daemon(f):
spec = get_spec(f)
assert spec.args[0] == 'ucred'
spec = spec._replace(args=spec.args[1:])
daemon_funcs[f.__name__] = f
if is_daemon:
return f
# TODO validate types
def l(*arg, **kvarg):
if no_daemon:
f(my_ucred(), *arg, **kvarg)
r = ask_server({"fname":f.__name__, "arg": arg, "kvarg": kvarg})
return r["return"]
l.__name__ = f.__name__
l.spec = spec
return l
##########################################################
root_folder="/mnt/virtual/"
boot_id = open("/proc/sys/kernel/random/boot_id").read().strip()
state_not_registered = "not registered"
state_powered_off = "powered off"
state_aborted = "aborted"
state_saved = "saved"
def vm_dir(vm: str):
return f"{root_folder}/{vm}.vm/"
@cmd
def name_to_id(name: str) -> str:
assert is_valid_id(name) or is_valid_name(name)
if is_valid_name(name):
l = str(pathlib.Path(root_folder+name+".vm").readlink())
assert l[-3:] == ".vm"
name = l[:-3]
assert is_valid_id(name)
assert not os.path.islink(name+".vm")
return name
@cmd
def name(vm: str) -> str:
vm = name_to_id(vm)
return open(vm+".vm/name").read().strip()
def is_valid_name(name):
return all(i.isalpha() or i.isnumeric() or i in "-_" for i in name) and any(i.isalpha() for i in name)
def is_valid_id(id):
return all(i.isnumeric() for i in id)
def has_read_acces(ucred, vm: str):
# TODO!
return True
def has_write_acces(ucred, vm: str):
# TODO!
return True
@cmd
def get_ip(vm: str) -> str:
network_dir = vm_dir(vm)+"network/"
net_id = open(network_dir+"net_id").read().strip()
return f'10.37.{net_id}.150'
@cmd
def get_permanency(vm: str):
return open(vm_dir(vm)+"permanency").read().strip()
@cmd
@daemon
def set_permanency(ucred, vm: str, permanency: str):
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
assert permanency in ["tmp", "stable"]
with open(vm_dir(vm)+"permanency", "w") as f: f.write(permanency)
@cmd
def give_to_user(vm: str, uid: int, gid: Optional[int] = None):
if gid is None:
import pwd
gid = pwd.getpwuid(uid).pw_gid
vm = name_to_id(vm)
os.chown(vm_dir(vm)+"id_ed25519", uid, gid)
@cmd
@daemon
def state(ucred, vm: str):
vm = name_to_id(vm)
assert has_read_acces(ucred, vm)
vm = name_to_id(vm)
p = run(["VBoxManage", "showvminfo", vm], capture_output=True, encoding='utf8')
if "Could not find a registered machine named " in p.stderr:
return state_not_registered
for l in p.stdout.split('\n'):
if l.startswith("State:"):
return l[10:].split("(")[0].strip()
print(p.stderr, file=sys.stderr)
raise RuntimeError()
@cmd
def create_from_img(target: str, new_ssh: bool =True, target_name = None):
target_name = target_name or target
target = name_to_id(target)
target_dir = vm_dir(target)
vnc_passwd = random_passwd()
with open(target_dir+"name", "w") as f: f.write(target_name)
with open(target_dir+"permanency", "w") as f: f.write(f"init {int(time.time())}")
with open(target_dir+"vnc_passwd", "w") as f: f.write(vnc_passwd)
mount_dir = target_dir+"mount/"
os.mkdir(mount_dir)
r('mount', '-o', 'loop,offset=210763776', '--type', 'ext4', target_dir+'img', mount_dir)
try:
with open(mount_dir+"/etc/hostname", "w") as f: f.write(target_name)
if new_ssh:
r("ssh-keygen", "-t", "ed25519", "-C", f"virtual for root,u@vm_{target}", "-f", target_dir+"id_ed25519", "-P", "")
for place in ["/root/.ssh/", "/home/u/.ssh/"]:
if os.path.isfile(mount_dir+place+"/authorized_keys"):
with open(mount_dir+place+"/authorized_keys", "w") as f:
f.write(open(target_dir+"id_ed25519.pub", "r").read())
r("ssh-keygen", "-t", "ed25519", "-C", f"hostkey of root,u@vm_{target}", "-f", target_dir+"ssh_host_ed25519_key", "-P", "")
for suffix in ["", ".pub"]:
with open(mount_dir+"/etc/ssh/ssh_host_ed25519_key"+suffix, "w") as f:
f.write(open(target_dir+"ssh_host_ed25519_key"+suffix, "r").read())
with open(target_dir+"known_hosts", "w") as f:
f.write(f"vm_{target} "+" ".join(open(target_dir+"ssh_host_ed25519_key"+suffix, "r").read().split()[:2]))
with open(mount_dir+"vnc_passwd", "w") as f: f.write(vnc_passwd)
finally:
r('umount', mount_dir)
os.rmdir(mount_dir)
with open(target_dir+"disk.vmdk", "w") as f:
f.write(S-f"""
# Disk DescriptorFile
version=1
CID=076a5ce7
parentCID=ffffffff
createType="fullDevice"
# Extent description
RW 209715200 FLAT "{target_dir}/img" 0
# The disk Data Base
#DDB
ddb.virtualHWVersion = "4"
ddb.adapterType="ide"
ddb.geometry.cylinders="16383"
ddb.geometry.heads="16"
ddb.geometry.sectors="63"
ddb.uuid.image="24823fb3-a4b8-4c04-a2f8-ef70fa38e6ab"
ddb.uuid.parent="00000000-0000-0000-0000-000000000000"
ddb.uuid.modification="41a03768-211c-4638-b47e-ce1ee3fe7200"
ddb.uuid.parentmodification="00000000-0000-0000-0000-000000000000"
ddb.geometry.biosCylinders="1024"
ddb.geometry.biosHeads="255"
ddb.geometry.biosSectors="63"
""")
r('VBoxManage', 'internalcommands', 'sethduuid', target_dir+"disk.vmdk")
r('VBoxManage', 'createvm', f'--name={target}', f"--basefolder={folder}/{target}.vm", "--register")
r('VBoxManage', 'modifyvm', target, '--firmware=efi')
r('VBoxManage', 'modifyvm', target, '--memory=4096')
r("VBoxManage", "storagectl", target, '--name', "SATA Controller", '--add', 'sata', '--bootable', 'on')
r("VBoxManage", "storageattach", target, "--storagectl", "SATA Controller", "--port", "0", "--device", "0", "--type", "hdd", "--medium", f"{folder}/{target_dir}/disk.vmdk")
create_net(target)
@cmd
@daemon
def clone(ucred, target: str, base: str = "base") -> str:
base = name_to_id(base)
assert has_read_acces(ucred, base)
target = clone_copy(target, vm_dir(base)+"img")
give_to_user(target, ucred.uid, ucred.gid)
return target
@cmd
def clone_copy(target: str, img_path: str) -> str:
import random
assert is_valid_name(target)
target_id = f"{random.randint(0, 999999):06}"
assert not os.path.exists(target_id+".vm")
assert not os.path.exists(target+".vm")
target_dir = target_id + ".vm/"
os.mkdir(target_dir)
r('ln', '-sr', target_id+".vm", target+".vm")
r('cp', '--reflink', "-n", img_path, target_dir+"img")
create_from_img(target_id, target_name=target)
return target_id
def ssh_args(vm: str, *arg: tuple[str, ...], user: str = "u"):
vm, user = extended_name(vm, user)
vm = name_to_id(vm)
return ['ssh', "-i", vm_dir(vm)+"id_ed25519",
"-o", f"UserKnownHostsFile={vm_dir(vm)}/known_hosts", "-o", "HostKeyAlgorithms=ssh-ed25519", "-o", f"HostKeyAlias=vm_{vm}",
f"{user}@{get_ip(vm)}", *arg]
@cmd
def ssh(vm: str, *arg: tuple[str, ...], user: str = "u"):
run(ssh_args(vm, *arg, user=user))
@cmd
def vncapp(vm: str, cmd: str):
import random
import psutil
vm, user = extended_name(vm)
vm = name_to_id(vm)
display_id=random.randint(1, 50)
vnc_server = subprocess.Popen(ssh_args(vm, f"(cat /vnc_passwd;echo; cat /vnc_passwd; echo;echo n) | vncpasswd; vncserver :{display_id}", user=user))
vnc_client_env = os.environ.copy()
vnc_client_env["VNC_PASSWORD"] = open(vm_dir(vm)+"vnc_passwd", "r").read().strip()
app = subprocess.Popen(ssh_args(vm, f"DISPLAY=:{display_id}", cmd, user=user));
time.sleep(1)
vnc_client = subprocess.Popen(["vncviewer", get_ip(vm)+f":{display_id}"], env=vnc_client_env)
def on_terminate(proc):
print("KILLING ALL APPS")
vnc_server.send_signal(15)
vnc_client.send_signal(15)
app.send_signal(15)
psutil.wait_procs([vnc_client, app, vnc_server], callback=on_terminate)
@cmd
def vncsession(vm: str, display_id: int =0):
import random
import psutil
vm, user = extended_name(vm)
vm = name_to_id(vm)
vnc_server = subprocess.Popen(ssh_args(vm, f"(cat /vnc_passwd;echo; cat /vnc_passwd; echo;echo n) | vncpasswd; vncserver :{display_id}", user=user))
vnc_client_env = os.environ.copy()
vnc_client_env["VNC_PASSWORD"] = open(vm_dir(vm)+"vnc_passwd", "r").read().strip()
time.sleep(1)
vnc_client = subprocess.Popen(["vncviewer", get_ip(vm)+f":{display_id}"], env=vnc_client_env)
def on_terminate(proc):
print("KILLING ALL APPS")
vnc_server.send_signal(15)
vnc_client.send_signal(15)
psutil.wait_procs([vnc_client, vnc_server], callback=on_terminate)
def all_virtuals():
for f in os.listdir(root_folder):
if f.endswith(".vm"):
vm = f[:-3]
if is_valid_id(vm):
yield vm
@cmd
@daemon
def clean(ucred):
for vm in all_virtuals():
if has_write_acces(ucred, vm):
permanency = get_permanency(vm)
if permanency == "tmp":
if state(ucred, vm) in [state_powered_off, state_not_registered, state_aborted, state_saved]:
remove_force(ucred, vm)
if permanency.startswith("init "):
init_time = int(permanency[5:])
if time.time() - init_time > 100:
remove_force(ucred, vm)
@cmd
def remove_net(vm: str):
network_dir = vm_dir(vm)+"network/"
if os.path.isfile(network_dir+"interface"):
interface = open(network_dir+"interface").read().strip()
if os.path.isfile(network_dir+"interface"):
interface = open(network_dir+"interface").read().strip()
if open(network_dir+"boot_id", "r").read().strip() == boot_id:
if os.path.isfile(network_dir+"dhcp.pid"):
r("kill", open(network_dir+"dhcp.pid").read().strip())
net_id = int(open(network_dir+"net_id", "r").read())
r("ip", "addr", "del", f"10.37.{net_id}.1/24", "dev", interface)
r("VBoxManage", "hostonlyif", "remove", interface)
r('rm', '-r', network_dir)
@cmd
def create_net(vm: str):
remove_net(vm)
network_dir = vm_dir(vm)+"network/"
os.mkdir(network_dir)
if os.path.isfile(network_dir+"interface"):
interface = open(network_dir+"interface").read().strip()
else:
p = run(["VBoxManage", "hostonlyif", "create"], capture_output=True, encoding='utf8')
if p.returncode:
print(p.stderr, file=sys.stderr)
raise RuntimeError()
interface = p.stdout.split("'")[1]
with open(network_dir+"interface", "w") as f:
f.write(interface)
net_id = int(interface[7:])
print("interface", interface)
r("VBoxManage", "hostonlyif", "ipconfig", interface, f"--ip=10.37.{net_id}.1", "--netmask=255.255.255.0")
r("sysctl", "net.ipv4.ip_forward=1")
with open(network_dir+"boot_id", "w") as f: f.write(boot_id)
with open(network_dir+"net_id", "w") as f: f.write(str(net_id))
#r("ip", "link", "add", f"v{net_id}h", "type", "veth", "peer", "name", f"v{net_id}g")
#r("ip", "link", "add", f"v{net_id}b", "type", "bridge")
#r("ifconfig", f"v{net_id}h", "up")
#r("ip", "link", "set", f"v{net_id}g", "master", f"v{net_id}b")
r("ifconfig", interface, f"10.37.{net_id}.1", "netmask", "255.255.255.0", "up");
nft(f"")
nft(S-f"""
add chain inet filter input_from_{interface}
add chain inet filter forward_from_{interface}
add chain inet filter forward_to_{interface}
insert rule inet filter input iifname {interface} jump input_from_{interface}
insert rule inet filter forward iifname {interface} jump forward_from_{interface}
insert rule inet filter forward oifname {interface} jump forward_to_{interface}
""")
modify_net(ucred, vm)
#nft("add rule inet filter forward iifname wlp1s0 accept")
r("VBoxManage", "modifyvm", vm, "--nic1=hostonly", f"--host-only-adapter1={interface}")
#r("VBoxManage", "modifyvm", vm, "--nic1=bridged", f"--bridgeadapter1=v{net_id}b")
with open(network_dir+"dhcp.lp", "w") as f: f.write("authoring-byte-order little-endian;")
with open(network_dir+"dhcp.pid", "w") as f: f.write("")
with open(network_dir+"dhcp.config", "w") as f:
f.write(S-f"""
option domain-name-servers 8.8.8.8, 8.8.4.4;
option subnet-mask 255.255.255.0;
option routers 10.37.{net_id}.1;
subnet 10.37.{net_id}.0 netmask 255.255.255.0 {{
range 10.37.{net_id}.150 10.37.{net_id}.250;
}}
""")
r("dhcpd", "-4", "-cf", network_dir+"dhcp.config", "-pf", network_dir+"dhcp.pid", "-lf", network_dir+"dhcp.lp", interface)
@cmd
@daemon
def modify_net(ucred, vm: str, van: bool = False, lan: bool = False, pc: bool = False, pc_all: bool = False):
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
assert not (pc_all and not pc)
assert not (lan and not van)
network_dir = vm_dir(vm)+"network/"
interface = open(network_dir+"interface").read().strip()
net_id = open(network_dir+"net_id").read().strip()
assert open(network_dir+"boot_id", "r").read().strip() == boot_id
local_ips = "{10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16}"
if van:
pass
todo = [f"flush chain inet filter input_from_{interface}",
f"flush chain inet filter forward_from_{interface}",
f"flush chain inet filter forward_to_{interface}"]
todo.append(f"add rule inet filter input_from_{interface} ct state {{ established, related }} accept")
if not pc:
todo.append(f"add rule inet filter input_from_{interface} drop")
if pc_all:
todo.append(f"add rule inet filter input_from_{interface} accept")
if lan:
todo.append(f"add rule inet filter forward_from_{interface} ip daddr {local_ips} accept")
todo.append(f"add rule inet filter forward_to_{interface} ip saddr {local_ips} ct state {{ established, related }} accept")
todo.append(f"add rule inet filter forward_to_{interface} ip saddr {local_ips} drop")
else:
todo.append(f"add rule inet filter forward_from_{interface} ip daddr {local_ips} drop")
todo.append(f"add rule inet filter forward_from_{interface} ip saddr {local_ips} drop")
if van:
todo.append(f"add rule inet filter forward_from_{interface} accept")
todo.append(f"add rule inet filter forward_to_{interface} ct state {{ established, related }} accept")
else:
todo.append(f"add rule inet filter forward_from_{interface} drop")
todo.append(f"add rule inet filter forward_to_{interface} drop")
nft("\n".join(todo))
@cmd
@daemon
def start(ucred, vm: str):
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
r("VBoxManage", "startvm", vm, "--type=headless")
if get_permanency(vm).startswith("init "):
set_permanency(ucred, vm, "tmp")
@cmd
@daemon
def try_ping(ucred, vm: str) -> bool:
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
from pythonping import ping
ip = get_ip(vm)
r = ping(ip, verbose=True, count=1, timeout=0.1)
return r.packets_lost == 0
@cmd
def wait_started(vm: str):
while True:
if try_ping(vm):
break
@cmd
def start_and_wait(vm: str):
vm = name_to_id(vm)
start(vm)
wait_started(vm)
@daemon
def poweroff(ucred, vm: str):
vm = name_to_id(vm)
assert has_write_acces(ucred, vm)
r("VBoxManage", "controlvm", vm, "acpipowerbutton")
@cmd
def poweroff_and_wait(vm: str):
vm = name_to_id(vm)
if state(vm) in [state_powered_off, state_not_registered, state_aborted, state_saved]: return
poweroff(vm)
while True:
time.sleep(0.1)
s = state(vm)
print("state", s)
if s == state_powered_off:
time.sleep(1)
return
@cmd
@daemon
def get_tmp_name(ucred) -> str:
last_id = int(open("last_tmp_id").read().strip())
last_id = (last_id+1) % 1000
while os.path.exists(f"tmp-{last_id}.vm"):
last_id = (last_id+1) % 1000
with open("last_tmp_id", "w") as f: f.write(str(last_id))
return f"tmp-{last_id}"
if is_daemon:
prepared_forks = {}
@cmd
@daemon
def get_prepared_fork(ucred, base: str = "base") -> Optional[str]:
name_to_id(base)
assert has_read_acces(ucred, base)
if base in prepared_forks and len(prepared_forks[base]):
vm = prepared_forks[base][0]
prepared_forks[base] = prepared_forks[base][1:]
give_to_user(vm, ucred.uid, ucred.gid)
return vm
@cmd
@daemon
def prepare_forks(ucred, base: str = "base", count: int =1) -> Optional[str]:
assert ucred.uid == 0
if not base in prepared_forks:
prepared_forks[base] = []
if len(prepared_forks[base]) < count:
target = get_tmp_name(ucred)
target = clone(ucred, target, base=base)
start(ucred, target)
prepared_forks[base].append(target)
return target
@cmd
def get_tmp_vm(base: str = "base"):
target = get_prepared_fork(base)
if not target:
target = get_tmp_name()
target = clone(target, base)
start(target)
return target
@cmd
def extended_name(vm: str, user: str = "u") -> tuple[str, str]:
assert not is_daemon
if len(vm.split("@"))==2:
user, vm = vm.split("@")
if len(vm.split("~"))==2:
base, vm = vm.split("~")
if not vm :
vm = get_tmp_vm(base or "base")
else:
vm = clone(vm, base or "base")
start(vm)
wait_started(vm)
vm = name_to_id(vm)
return vm, user
@daemon
def remove_force(ucred, vm: str, keep_image: bool = False):
vm = name_to_id(vm)
if os.path.isfile(f"{vm}.vm/no_remove"):
raise RuntimeError("Delete foribidden")
assert has_write_acces(ucred, vm)
if state(ucred, vm) != state_not_registered:
r("VBoxManage", "unregistervm", vm, "--delete-all")
remove_net(vm)
if keep_image:
for f in os.listdir(vm+".vm"):
if f != 'img':
r("rm", "-r", vm+".vm/"+f)
else:
r("rm", name(vm)+".vm")
r("rm", "-r", vm+".vm")
@cmd
def remove(vm: str, keep_image: bool = False):
vm = name_to_id(vm)
poweroff_and_wait(vm)
remove_force(vm, keep_image=keep_image)
##########################################################
if is_daemon:
import socket
import struct
try:
os.unlink(socket_path)
except OSError:
if os.path.exists(socket_path):
raise
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(socket_path)
os.chmod(socket_path, 0o777)
try:
while True:
server.listen(1)
print('Server is listening for incoming connections...')
connection, client_address = server.accept()
print('Connection from', str(connection), connection, client_address)
_ucred = struct.Struct("=iII")
pid, uid, gid = _ucred.unpack(connection.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, _ucred.size))
ucred = Ucred(pid, uid, gid)
in_data = recvall(connection)
print("IN", in_data)
in_struct = json.loads(in_data)
print("IN", in_struct)
f = daemon_funcs[in_struct["fname"]]
try:
res = f(ucred, *in_struct["arg"], **in_struct["kvarg"])
except Exception as e:
import traceback
traceback.print_exception(e)
out_struct = {'excepttion': str(type(e))}
else:
out_struct = {'return': res}
print("OUT", out_struct)
out_data = json.dumps(out_struct).encode('utf-8')
print("OUT", out_data)
connection.sendall(out_data)
connection.close()
finally:
# close the connection
# remove the socket file
os.unlink(socket_path)
exit(1)
if __name__ == "__main__":
args = parser.parse_args()
print(args)
if not args.subcommand:
parser.print_help()
else:
import inspect
force = args.force
if args.root_folder is not None:
root_folder = args.root_folder+"/"
f = subcommands[args.subcommand]
spec = get_spec(f)
f_kvarg = {}
f_arg = []
for i, arg in enumerate(spec.args):
has_default = spec.defaults is not None and i >= len(spec.args) - len(spec.defaults)
if has_default:
if args.__dict__[arg] is not None:
f_kvarg[arg] = args.__dict__[arg]
else:
f_arg.append(args.__dict__[arg])
if spec.varargs is not None:
arg = spec.varargs
annotation = spec.annotations.get(arg, None)
if annotation == tuple[str, ...]:
f_arg += args.__dict__[arg]
print(f_arg, f_kvarg)
r = f(*f_arg, **f_kvarg)
if r is not None: print(r)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment