Source code for eye.helpers.remote_control

# this project is licensed under the WTFPLv2, see COPYING.txt for details

from functools import wraps
import logging
import os

from PyQt5.QtCore import QObject, Q_CLASSINFO
from PyQt5.QtDBus import QDBusConnection, QDBusVariant, QDBusMessage

from eye import pathutils
from eye.connector import CategoryMixin
from eye.helpers.intent import register_intent_listener, send_intent
from eye.qt import Slot

__all__ = ('register_remote_request', 'on_request_open', 'SimpleHandler')


LOGGER = logging.getLogger(__name__)


ROOT_OBJ = None
BUS = None


[docs] class SimpleHandler(QObject, CategoryMixin): Q_CLASSINFO('D-Bus Interface', 're.indigo.eye') def __init__(self, **kwargs): super().__init__(**kwargs) self.add_category('remote_control')
[docs] @Slot(str, result=QDBusVariant) @Slot(str, QDBusVariant, result=QDBusVariant) @Slot(str, str, result=QDBusVariant) @Slot(str, QDBusVariant, QDBusVariant, result=QDBusVariant) @Slot(str, str, str, result=QDBusVariant) @Slot(str, QDBusVariant, QDBusVariant, QDBusVariant, result=QDBusVariant) @Slot(str, str, str, str, result=QDBusVariant) @Slot(str, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, result=QDBusVariant) @Slot(str, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, result=QDBusVariant) @Slot(str, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, result=QDBusVariant) @Slot(str, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, result=QDBusVariant) @Slot(str, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, QDBusVariant, result=QDBusVariant) def request(self, request_type, *args): args = tuple(arg.variant() if isinstance(arg, QDBusVariant) else arg for arg in args) LOGGER.debug('received request %r%r', request_type, args) result = send_intent(self, 'remote_request', request_type=request_type, args=args) if result is None: result = False LOGGER.debug('replying %r to request %r%r', result, request_type, args) return QDBusVariant(result)
[docs] def register_remote_request(request_type, stackoffset=0): def decorator(func): @register_intent_listener('remote_request', categories='remote_control', stackoffset=(1 + stackoffset)) @wraps(func) def wrapper(remote, intent): if intent.info.request_type == request_type and getattr(func, 'enabled', True): result = func(intent.info.args) intent.accept(result) return True return False return func return decorator
def create_server(): global BUS, ROOT_OBJ ROOT_OBJ = SimpleHandler() BUS = QDBusConnection.sessionBus() BUS.registerService('re.indigo.eye') BUS.registerObject('/', ROOT_OBJ, QDBusConnection.ExportAllContents) def send_request(req, *args): global BUS LOGGER.debug('sending request %r%r', req, args) method_args = [req] method_args.extend(args) msg = QDBusMessage.createMethodCall('re.indigo.eye', '/', 're.indigo.eye', 'request') msg.setArguments(method_args) BUS = QDBusConnection.sessionBus() reply = BUS.call(msg) if reply.type() == QDBusMessage.ErrorMessage: raise ValueError(reply.errorMessage()) return list(reply.arguments()) @register_remote_request('ping') def on_request_ping(args): return True
[docs] @register_remote_request('open') def on_request_open(args): path, row, col = pathutils.parse_filename(args[0]) path = os.path.abspath(path) if row is None: loc = None else: loc = (row, col) send_intent(ROOT_OBJ, 'open_editor', path=path, loc=loc, reason='remote')