generated from SteamDeckHomebrew/decky-plugin-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
167 lines (142 loc) · 5.39 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import decky_plugin
import queue
from settings import SettingsManager
def import_third_party_lib():
import sys
from pathlib import Path
plugin_dir = Path(__file__).parent.resolve()
decky_plugin.logger.info(f'plugin dir: {plugin_dir}')
sys.path.insert(0, str(plugin_dir))
sys.path.insert(0, str(plugin_dir.joinpath("lib")))
def setup_environ_vars():
import os
os.environ['XDG_RUNTIME_DIR'] = '/run/user/1000'
os.environ['DBUS_SESSION_BUS_ADDRESS'] = 'unix:path=/run/user/1000/bus'
os.environ['HOME'] = '/home/deck'
import_third_party_lib()
setup_environ_vars()
settings_dir = decky_plugin.DECKY_PLUGIN_SETTINGS_DIR
settings = SettingsManager(name="settings", settings_directory=settings_dir)
event_queue = queue.Queue()
from dbus_next.aio import MessageBus
from dbus_next import Message, MessageType
from dbus_next.service import ServiceInterface, method, dbus_property, signal
bus = None
class AppRequest:
def __init__(self, sender, cookie, application, reason):
self.sender = sender
self.cookie = cookie
self.application = application
self.reason = reason
async def is_connected(self):
global bus
message = Message(
destination='org.freedesktop.DBus',
path='/org/freedesktop/DBus',
interface='org.freedesktop.DBus',
member='GetConnectionUnixProcessID',
signature='s',
body=[self.sender]
)
reply = await bus.call(message)
return reply.message_type != MessageType.ERROR
class BaseInterface(ServiceInterface):
ignore_application = ["Steam", "./steamwebhelper"]
request_map = {}
cookie = 0
def __init__(self, service):
super().__init__(service)
@method()
async def Inhibit(self, application: 's', reason: 's') -> 'u':
if application in BaseInterface.ignore_application: return 0
decky_plugin.logger.info(f'called Inhibit with application={application} and reason={reason}')
event_queue.put({"type": "Inhibit"})
sender = ServiceInterface.last_msg.sender
BaseInterface.cookie += 1
BaseInterface.request_map[BaseInterface.cookie] = AppRequest(sender, BaseInterface.cookie, application, reason)
return BaseInterface.cookie
@method()
def UnInhibit(self, cookie: 'u'):
if cookie == 0: return
decky_plugin.logger.info(f'called UnInhibit with cookie={cookie}')
if BaseInterface.request_map.pop(cookie, None) is None:
decky_plugin.logger.info(f'cannot find cookie={cookie}')
if len(BaseInterface.request_map) == 0:
event_queue.put({"type": "UnInhibit"})
class InhibitInterface(BaseInterface):
def __init__(self):
super().__init__('org.freedesktop.ScreenSaver')
class PMInhibitInterface(BaseInterface):
def __init__(self):
super().__init__('org.freedesktop.PowerManagement.Inhibit')
async def stop_dbus():
global bus
try:
if bus is not None:
bus.disconnect()
bus = None
except Exception as e:
decky_plugin.logger.info(f"error: {e}")
async def start_dbus():
global bus
await stop_dbus()
try:
bus = await MessageBus().connect()
interface = InhibitInterface()
pm_interface = PMInhibitInterface()
bus.export('/ScreenSaver', interface) # vlc
bus.export('/org/freedesktop/ScreenSaver', interface) # chrome
bus.export('/org/freedesktop/PowerManagement/Inhibit', pm_interface) # wiliwili
await bus.request_name('org.freedesktop.PowerManagement')
await bus.request_name('org.freedesktop.ScreenSaver')
except Exception as e:
decky_plugin.logger.info(f"error: {e}")
class Plugin:
async def start_backend(self):
decky_plugin.logger.info("Start backend server")
await start_dbus()
async def stop_backend(self):
decky_plugin.logger.info("Stop backend server")
await stop_dbus()
event_queue.queue.clear()
async def is_running(self):
global bus
return bus is not None
async def get_event(self):
global bus
if bus is None:
return []
res = []
while not event_queue.empty():
try:
res.append(event_queue.get_nowait())
except queue.Empty:
continue
if len(res) > 0:
return res
# check closed dbus connection
cookies = list(BaseInterface.request_map.keys())
clear = False
for c in cookies:
connected = await BaseInterface.request_map[c].is_connected()
if not connected:
BaseInterface.request_map.pop(c)
clear = True
if clear and len(BaseInterface.request_map) == 0:
return [{"type": "UnInhibit"}]
return []
async def get_settings(self, key: str, defaults):
decky_plugin.logger.info('[settings] get {}'.format(key))
return settings.getSetting(key, defaults)
async def set_settings(self, key: str, value):
decky_plugin.logger.info('[settings] set {}: {}'.format(key, value))
return settings.setSetting(key, value)
async def _main(self):
decky_plugin.logger.info("Hello World!")
async def _unload(self):
decky_plugin.logger.info("Goodnight World!")
stop_dbus()
async def _uninstall(self):
pass
async def _migration(self):
pass