[ Avaa Bypassed ]




Upload:

Command:

www-data@3.144.250.2: ~ $
#!/usr/bin/env python3
# vim: set expandtab shiftwidth=4:
# -*- Mode: python; coding: utf-8; indent-tabs-mode: nil -*- */
#
# Copyright © 2018 Red Hat, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice (including the next
# paragraph) shall be included in all copies or substantial portions of the
# Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

import os
import sys
import time
import math
import multiprocessing
import argparse
from pathlib import Path

try:
    import libevdev
    import yaml
    import pyudev
except ModuleNotFoundError as e:
    print("Error: {}".format(e), file=sys.stderr)
    print(
        "One or more python modules are missing. Please install those "
        "modules and re-run this tool."
    )
    sys.exit(1)


SUPPORTED_FILE_VERSION = 1


def error(msg, **kwargs):
    print(msg, **kwargs, file=sys.stderr)


class YamlException(Exception):
    pass


def fetch(yaml, key):
    """Helper function to avoid confusing a YAML error with a
    normal KeyError bug"""
    try:
        return yaml[key]
    except KeyError:
        raise YamlException("Failed to get '{}' from recording.".format(key))


def check_udev_properties(yaml_data, uinput):
    """
    Compare the properties our new uinput device has with the ones from the
    recording and ring the alarm bell if one of them is off.
    """
    yaml_udev_section = fetch(yaml_data, "udev")
    yaml_udev_props = fetch(yaml_udev_section, "properties")
    yaml_props = {
        k: v for (k, v) in [prop.split("=", maxsplit=1) for prop in yaml_udev_props]
    }
    try:
        # We don't assign this one to virtual devices
        del yaml_props["LIBINPUT_DEVICE_GROUP"]
    except KeyError:
        pass

    # give udev some time to catch up
    time.sleep(0.2)
    context = pyudev.Context()
    udev_device = pyudev.Devices.from_device_file(context, uinput.devnode)
    for name, value in udev_device.properties.items():
        if name in yaml_props:
            if yaml_props[name] != value:
                error(
                    f"Warning: udev property mismatch: recording has {name}={yaml_props[name]}, device has {name}={value}"
                )
            del yaml_props[name]
        else:
            # The list of properties we add to the recording, see libinput-record.c
            prefixes = (
                "ID_INPUT",
                "LIBINPUT",
                "EVDEV_ABS",
                "MOUSE_DPI",
                "POINTINGSTICK_",
            )
            for prefix in prefixes:
                if name.startswith(prefix):
                    error(f"Warning: unexpected property: {name}={value}")

    # the ones we found above were removed from the dict
    for name, value in yaml_props.items():
        error(f"Warning: device is missing recorded udev property: {name}={value}")


def create(device):
    evdev = fetch(device, "evdev")

    d = libevdev.Device()
    d.name = fetch(evdev, "name")

    ids = fetch(evdev, "id")
    if len(ids) != 4:
        raise YamlException("Invalid ID format: {}".format(ids))
    d.id = dict(zip(["bustype", "vendor", "product", "version"], ids))

    codes = fetch(evdev, "codes")
    for evtype, evcodes in codes.items():
        for code in evcodes:
            data = None
            if evtype == libevdev.EV_ABS.value:
                values = fetch(evdev, "absinfo")[code]
                absinfo = libevdev.InputAbsInfo(
                    minimum=values[0],
                    maximum=values[1],
                    fuzz=values[2],
                    flat=values[3],
                    resolution=values[4],
                )
                data = absinfo
            elif evtype == libevdev.EV_REP.value:
                if code == libevdev.EV_REP.REP_DELAY.value:
                    data = 500
                elif code == libevdev.EV_REP.REP_PERIOD.value:
                    data = 20
            d.enable(libevdev.evbit(evtype, code), data=data)

    properties = fetch(evdev, "properties")
    for prop in properties:
        d.enable(libevdev.propbit(prop))

    uinput = d.create_uinput_device()

    check_udev_properties(device, uinput)

    return uinput


def print_events(devnode, indent, evs):
    devnode = os.path.basename(devnode)
    for e in evs:
        print(
            "{}: {}{:06d}.{:06d} {} / {:<20s} {:4d}".format(
                devnode,
                " " * (indent * 8),
                e.sec,
                e.usec,
                e.type.name,
                e.code.name,
                e.value,
            )
        )


def collect_events(frame):
    evs = []
    events_skipped = False
    for (sec, usec, evtype, evcode, value) in frame:
        if evtype == libevdev.EV_KEY.value and value == 2:  # key repeat
            events_skipped = True
            continue

        e = libevdev.InputEvent(
            libevdev.evbit(evtype, evcode), value=value, sec=sec, usec=usec
        )
        evs.append(e)

    # If we skipped some events and now all we have left is the
    # SYN_REPORTs, we drop the SYN_REPORTs as well.
    if events_skipped and all(e for e in evs if e.matches(libevdev.EV_SYN.SYN_REPORT)):
        return []
    else:
        return evs


def replay(device, verbose):
    events = fetch(device, "events")
    if events is None:
        return
    uinput = device["__uinput"]

    # The first event may have a nonzero offset but we want to replay
    # immediately regardless. When replaying multiple devices, the first
    # offset is the offset from the first event on any device.
    offset = time.time() - device["__first_event_offset"]

    if offset < 0:
        error("WARNING: event time offset is in the future, refusing to replay")
        return

    # each 'evdev' set contains one SYN_REPORT so we only need to check for
    # the time offset once per event
    for event in events:
        try:
            evdev = fetch(event, "evdev")
        except YamlException:
            continue

        evs = collect_events(evdev)
        if not evs:
            continue

        evtime = evs[0].sec + evs[0].usec / 1e6 + offset
        now = time.time()

        if evtime - now > 150 / 1e6:  # 150 µs error margin
            time.sleep(evtime - now - 150 / 1e6)

        uinput.send_events(evs)
        if verbose:
            print_events(uinput.devnode, device["__index"], evs)


def first_timestamp(device):
    events = fetch(device, "events")
    for e in events or []:
        try:
            evdev = fetch(e, "evdev")
            (sec, usec, *_) = evdev[0]
            return sec + usec / 1.0e6
        except YamlException:
            pass

    return None


def wrap(func, *args):
    try:
        func(*args)
    except KeyboardInterrupt:
        pass


def loop(args, recording):
    devices = fetch(recording, "devices")

    first_timestamps = tuple(
        filter(lambda x: x is not None, [first_timestamp(d) for d in devices])
    )
    # All devices need to start replaying at the same time, so let's find
    # the very first event and offset everything by that timestamp.
    toffset = min(first_timestamps or [math.inf])

    for idx, d in enumerate(devices):
        uinput = create(d)
        print("{}: {}".format(uinput.devnode, uinput.name))
        d["__uinput"] = uinput  # cheaper to hide it in the dict then work around it
        d["__index"] = idx
        d["__first_event_offset"] = toffset

    if not first_timestamps:
        input("No events in recording. Hit enter to quit")
        return

    while True:
        if args.replay_after >= 0:
            time.sleep(args.replay_after)
        else:
            input("Hit enter to start replaying")

        processes = []
        for d in devices:
            p = multiprocessing.Process(target=wrap, args=(replay, d, args.verbose))
            processes.append(p)

        for p in processes:
            p.start()

        for p in processes:
            p.join()

        del processes

        if args.once:
            break


def create_device_quirk(device):
    try:
        quirks = fetch(device, "quirks")
        if not quirks:
            return None
    except YamlException:
        return None
    # Where the device has a quirk, we match on name, vendor and product.
    # That's the best match we can assemble here from the info we have.
    evdev = fetch(device, "evdev")
    name = fetch(evdev, "name")
    id = fetch(evdev, "id")
    quirk = (
        "[libinput-replay {name}]\n"
        "MatchName={name}\n"
        "MatchVendor=0x{id[1]:04X}\n"
        "MatchProduct=0x{id[2]:04X}\n"
    ).format(name=name, id=id)
    quirk += "\n".join(quirks)
    return quirk


def setup_quirks(recording):
    devices = fetch(recording, "devices")
    overrides = None
    quirks = []
    for d in devices:
        if "quirks" in d:
            quirk = create_device_quirk(d)
            if quirk:
                quirks.append(quirk)
    if not quirks:
        return None

    overrides = Path("/etc/libinput/local-overrides.quirks")
    if overrides.exists():
        print(
            "{} exists, please move it out of the way first".format(overrides),
            file=sys.stderr,
        )
        sys.exit(1)

    overrides.parent.mkdir(exist_ok=True)
    with overrides.open("w+") as fd:
        fd.write("# This file was generated by libinput replay\n")
        fd.write("# Unless libinput replay is running right now, remove this file.\n")
        fd.write("\n\n".join(quirks))

    return overrides


def check_file(recording):
    version = fetch(recording, "version")
    if version != SUPPORTED_FILE_VERSION:
        raise YamlException(
            "Invalid file format: {}, expected {}".format(
                version, SUPPORTED_FILE_VERSION
            )
        )

    ndevices = fetch(recording, "ndevices")
    devices = fetch(recording, "devices")
    if ndevices != len(devices):
        error(
            "WARNING: truncated file, expected {} devices, got {}".format(
                ndevices, len(devices)
            )
        )


def main():
    parser = argparse.ArgumentParser(description="Replay a device recording")
    parser.add_argument(
        "recording",
        metavar="recorded-file.yaml",
        type=str,
        help="Path to device recording",
    )
    parser.add_argument(
        "--replay-after",
        type=int,
        default=-1,
        help="Automatically replay once after N seconds",
    )
    parser.add_argument(
        "--once",
        action="store_true",
        default=False,
        help="Stop and exit after one replay",
    )
    parser.add_argument("--verbose", action="store_true")
    args = parser.parse_args()

    quirks_file = None

    try:
        with open(args.recording) as f:
            y = yaml.safe_load(f)
            check_file(y)
            quirks_file = setup_quirks(y)
            loop(args, y)
    except KeyboardInterrupt:
        pass
    except (PermissionError, OSError) as e:
        error("Error: failed to open device: {}".format(e))
    except YamlException as e:
        error("Error: failed to parse recording: {}".format(e))
    finally:
        if quirks_file:
            quirks_file.unlink()


if __name__ == "__main__":
    main()

Filemanager

Name Type Size Permission Actions
libinput-analyze File 62.32 KB 0755
libinput-analyze-per-slot-delta File 13.58 KB 0755
libinput-analyze-recording File 7.66 KB 0755
libinput-analyze-touch-down-state File 6.49 KB 0755
libinput-debug-events File 95.45 KB 0755
libinput-debug-gui File 99.52 KB 0755
libinput-debug-tablet File 75.32 KB 0755
libinput-list-devices File 70.24 KB 0755
libinput-list-kernel-devices File 4.08 KB 0755
libinput-measure File 62.32 KB 0755
libinput-measure-fuzz File 16.22 KB 0755
libinput-measure-touch-size File 11.4 KB 0755
libinput-measure-touchpad-pressure File 12.51 KB 0755
libinput-measure-touchpad-size File 13.51 KB 0755
libinput-measure-touchpad-tap File 8.33 KB 0755
libinput-quirks File 62.38 KB 0755
libinput-record File 110.57 KB 0755
libinput-replay File 11.82 KB 0755
libinput-test File 62.32 KB 0755