Skip to content

Add DeferredCommand #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions commands2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from .commandscheduler import CommandScheduler
from .conditionalcommand import ConditionalCommand
from .deferredcommand import DeferredCommand
from .exceptions import IllegalCommandUse
from .functionalcommand import FunctionalCommand
from .instantcommand import InstantCommand
Expand Down Expand Up @@ -38,6 +39,7 @@
"Command",
"CommandScheduler",
"ConditionalCommand",
"DeferredCommand",
"FunctionalCommand",
"IllegalCommandUse",
"InstantCommand",
Expand Down
63 changes: 63 additions & 0 deletions commands2/deferredcommand.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# validated: 2024-01-24 DS 192a28af4731 DeferredCommand.java
from typing import Callable

from wpiutil import SendableBuilder

from .command import Command
from .commandscheduler import CommandScheduler
from .printcommand import PrintCommand
from .subsystem import Subsystem


class DeferredCommand(Command):
"""
Defers Command construction to runtime. Runs the command returned by the supplier when this
command is initialized, and ends when it ends. Useful for performing runtime tasks before
creating a new command. If this command is interrupted, it will cancel the command.

Note that the supplier *must* create a new Command each call. For selecting one of a
preallocated set of commands, use :class:`commands2.SelectCommand`.
"""

def __init__(self, supplier: Callable[[], Command], *requirements: Subsystem):
"""
:param supplier: The command supplier
:param requirements: The command requirements.
"""
super().__init__()

assert callable(supplier)

self._null_command = PrintCommand(
f"[DeferredCommand] Supplied command (from {supplier!r} was None!"
)
self._supplier = supplier
self._command = self._null_command
self.addRequirements(*requirements)

def initialize(self):
cmd = self._supplier()
if cmd is not None:
self._command = cmd
CommandScheduler.getInstance().registerComposedCommands([self._command])
self._command.initialize()

def execute(self):
self._command.execute()

def isFinished(self):
return self._command.isFinished()

def end(self, interrupted):
self._command.end(interrupted)
self._command = self._null_command

def initSendable(self, builder: SendableBuilder):
super().initSendable(builder)
builder.addStringProperty(
"deferred",
lambda: "null"
if self._command is self._null_command
else self._command.getName(),
lambda _: None,
)
63 changes: 63 additions & 0 deletions tests/test_deferred_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import commands2

from util import * # type: ignore


def test_deferred_functions(scheduler: commands2.CommandScheduler):
inner_command = commands2.Command()
command = commands2.DeferredCommand(lambda: inner_command)

start_spying_on(inner_command)
start_spying_on(command)

command.initialize()
verify(inner_command).initialize()

command.execute()
verify(inner_command).execute()

assert not command.isFinished()
verify(inner_command).isFinished()

inner_command.isFinished = lambda: True
assert command.isFinished()
verify(inner_command, times=times(2)).isFinished()

command.end(False)
verify(inner_command).end(False)


def test_deferred_supplier_only_called_during_init(
scheduler: commands2.CommandScheduler,
):
supplier_called = 0

def supplier() -> commands2.Command:
nonlocal supplier_called
supplier_called += 1
return commands2.Command()

command = commands2.DeferredCommand(supplier)
assert supplier_called == 0

scheduler.schedule(command)
assert supplier_called == 1
scheduler.run()

scheduler.schedule(command)
assert supplier_called == 1


def test_deferred_requirements(scheduler: commands2.CommandScheduler):
subsystem = commands2.Subsystem()
command = commands2.DeferredCommand(lambda: commands2.Command(), subsystem)

assert subsystem in command.getRequirements()


def test_deferred_null_command(scheduler: commands2.CommandScheduler):
command = commands2.DeferredCommand(lambda: None) # type: ignore
command.initialize()
command.execute()
command.isFinished()
command.end(False)