bluefly

This module contains some experiments in writing flyscan Devices for bluesky

Installation

Once an initial release has been made you will be able to:

pip install bluefly

Architecture

This project aims to implement the minimum amount needed to demonstrate fly scanning in the style of Malcolm, updated to use type hints and asyncio. It separates interface classes from logic functions, and provides a generic FlyDevice that wraps user-written FlyLogic classes.

FlyDevice

Let’s start at the top. The interface up to bluesky is a “flyable” Device with the following interface:

fly_device = FlyDevice(detectors=..., MyFlyLogic(...))
# This means "tell your devices you are about to take data"
# Detectors flag that they should open a new file, but don't
# do anything about it yet
fly_device.stage()
# Store the flyscan trajectory and duration of each point
# Doesn't need to be done many times if the trajectory is the same
generator = CompoundGenerator(
    generators=[
        LineGenerator("ty", "mm", 0, 1, 2),
        LineGenerator("tx", "mm", 1, 2, 20),
    ],
    duration=0.1,
)
fly_device.configure(dict(generator=generator))
# Actually open the file as this is the first kickoff
await fly_device.kickoff()
# We can now get the HDF file as a resource
list(fly_device.collect())
list(fly_device.collect_asset_docs())
# Actually configure the hardware and start the fly_device.
# Motors will move to the start, detectors will open files
# and arm, then the fly_device will start. Status supports progress bar
status = fly_device.complete()
...
# Ctrl-C twice will do this, stop motors
fly_device.pause()
# Flag the fly_device as resuming from the last complete point, rather than
# from the beginning
fly_device.resume()
# This has nothing to do as the file is already open
await fly_device.kickoff()
# Retrace to the last complete point, then trigger the resume of the fly_device
status = fly_device.complete()
...
# We can then trigger the fly_device more times to write more data
await fly_device.kickoff()
await fly_device.complete()
await fly_device.kickoff()
await fly_device.complete()
# At any point we can collect the cached data and docs from the device
list(fly_device.collect())
list(fly_device.collect_asset_docs())
# When we're done we trigger a file close
fly_device.unstage()

This device can be nested inside other scans. This allows for instance a series of flying mapping scans within a step scanned energy scan (using a custom per_step function rather than one_nd_step)

The FlyDevice handles opening and closing the detectors, the FlyLogic handles moving motors, arming and triggering detectors.

Writing the logic

Each flyscan has an element of uniqueness, so writing FlyLogic should be simple and readable. The scan() method should start from a given point, placing data at an offset in the file, calling back when any detector has produced frames. The stop() method is used on pause to stop motion, triggering and detectors. Implementations make heavy use of descriptive library functions that are mixed together to make a set of logic:

@dataclass
class PMACMasterFlyLogic(FlyLogic):
    pmac: pmac.PMAC
    motors: List[motor.MotorDevice]

    async def scan(
        self,
        detectors: Sequence[DetectorDevice],
        points: RemainingPoints,
        offset: int,
        callback: Callable[[str, int], None],
    ):
        # Prepare the motors and arm detectors
        period, num = points.constant_duration, points.remaining
        tracker, _, _ = await asyncio.gather(
            pmac.build_initial_trajectory(self.pmac, self.motors, points),
            pmac.move_to_start(self.pmac, self.motors, points.peek_point()),
            detector.arm_detectors_triggered(detectors, num, offset, period),
        )
        # Kick off pmac, then show the progress of detectors
        await asyncio.gather(
            pmac.keep_filling_trajectory(self.pmac, tracker),
            detector.collect_detectors(detectors, num, callback),
        )

    async def stop(self, detectors: Sequence[DetectorDevice]):
        await asyncio.gather(
            detector.stop_detectors(detectors), pmac.stop_trajectory(self.pmac)
        )

Note, the baseclass FlyLogic is an interface class with only NotImplemented methods.

DetectorDevice

Detectors are SWMR HDF writing devices that are step scannable as well as being used within flyscans. They are backed by DetectorLogic that can be used both in step scans and fly scans. The open() method opens an HDF file in the given directory, returning details of the datasets it will write. The get_deadtime() method returns the deadtime needed between triggers for a given exposure time. The arm() method arms the detector for a number of frames at a given exposure to be placed at a given offset in the file, in software, triggered or gated mode. The collect() method then waits until the given number of frames have been collected, giving periodic progress updates via a callback. The stop() method stops the detector, and the close() method closes the file.

Again here, implementations make heavy use of library functions, so writing an implementation for each detector shouldn’t be too verbose:

@dataclass
class AndorLogic(DetectorLogic):
    driver: DetectorDriver
    hdf: HDFWriter
    _hdf_start_task: asyncio.Task = field(init=False)

    async def open(self, file_prefix: str) -> HDFResource:
        resource = HDFResource(
            data=[HDFDatasetResource()],
            summary=HDFDatasetResource("sum", "/entry/sum"),
            file_path=file_prefix + ".h5",
        )
        self._hdf_start_task = await open_hdf_file(self.hdf, resource)
        return resource

    async def get_deadtime(self, exposure: float) -> float:
        # Might need to prod the driver to do these calcs
        return calc_deadtime(exposure, readout_time=0.02, frequency_accuracy=50)

    async def arm(self, num: int, offset: int, mode: DetectorMode, exposure: float):
        # Choose the right Andor specific trigger mode (made up examples)
        await self.driver.trigger_mode.set(
            {
                DetectorMode.SOFTWARE: "Immediate",
                DetectorMode.TRIGGERED: "External",
                DetectorMode.GATED: "Gated",
            }[mode]
        )
        # Setup driver and HDF writer for n frames
        await setup_n_frames(self.driver, self.hdf, num, offset, exposure)
        # Need to overwrite here for this driver in particular
        period = exposure + await self.get_deadtime(exposure)
        await self.driver.acquire_period.set(period)
        # Kick off the driver to take data
        asyncio.create_task(self.driver.start())

    async def collect(self, num: int, callback: Callable[[int], None]):
        # Monitor progress, calling back on each flush
        await hdf_flush_and_observe(self.hdf, num, callback)

    async def stop(self):
        await self.driver.stop()

    async def close(self):
        await self.hdf.stop()
        await self._hdf_start_task

Note, the baseclass DetectorLogic is an interface class with only NotImplemented methods.

When using a DetectorDevice in a step scan, it reads the summary data back from the HDF file to pass back to bluesky:

andor_logic = areadetector.AndorLogic(
    areadetector.DetectorDriver("BLxxI-EA-DET-01:DRV"),
    areadetector.HDFWriter("BLxxI-EA-DET-01:HDF5"),
)
andor = detector.DetectorDevice(andor_logic)
# Flag that the HDF file needs opening on next trigger
andor.stage()
# Open the file, then arm and collect a single frame from the detector,
# status support progress bar
status = andor.trigger()
...
# Data and shape read from HDF file
andor.describe()
andor.read()
list(andor.collect_asset_docs())
# Take more data after moving motors etc.
await andor.trigger()
andor.read()
await andor.trigger()
andor.read()
list(andor.collect_asset_docs())
# Close the HDF file
andor.unstage()

MotorDevice

Motors are again similar to ophyd. They are not strictly needed for fly scanning as we make use of a trajectory scan interface on the PMAC, but they are convenient for tying a name to a motor record. It turns out to be easy to make them step scannable too, so that is also added. They take a motor record class, but can wrap classes like PMACRawMotor with extra signals.

HasSignals

The lowest level objects in bluefly (DetectorDriver, HDFWriter, MotorRecord, PMACCoord) are just containers for Signals. There is a utility class that lets them be defined like this:

class MotorRecord(HasSignals):
    demand: SignalRW[float]
    readback: SignalR[float]
    done_move: SignalR[bool]
    acceleration_time: SignalRW[float]
    velocity: SignalRW[float]
    max_velocity: SignalRW[float]
    # Actually read/write, but shouldn't write from scanning code
    resolution: SignalR[float]
    offset: SignalR[float]
    egu: SignalR[str]
    precision: SignalR[float]
    stop: SignalX

This makes the Signals friendly for mypy checking, but requires some extra data to map them to an actual Signal. This extra data is stored in the SignalProvider, and that is what fills in the Signals in the actual instance. Mapping is generally one to one, except in special cases where a dictionary mapping of several PVs to a single Signal can be specified:

@signal_sources(demands={x: f"demand_{x}" for x in CS_AXES})
class PMACCoord(HasSignals):
    port: SignalR[str]
    demands: SignalW[Dict[str, float]]
    move_time: SignalW[float]
    defer_moves: SignalW[bool]

SignalProvider

These are responsible for taking a signal prefix, the attribute names for the signals, and their type, and filling in a HasSignals structure with concrete instances:

class SignalProvider(ABC):
    @abstractmethod
    def make_signals(
        self,
        signal_prefix: str,
        details: Dict[str, SignalDetails] = {},
        add_extra_signals=False,
    ) -> AwaitableSignals:
        """For each signal detail listed in details, make a Signal of the given
        base_class. If add_extra_signals then include signals not listed in details.
        AttrName will be mapped to attr_name. Return {attr_name: signal}"""

For instance, provider.make_signals("BLxxI-MO-TABLE-01:X", {"demand": ..., "readback"...}) would provide a dictionary of Signals with entries “demand” and “readback” which can be set as attributes of the instance.

The mapping from signal prefix and attribute name to PV, is done on a provider specific way. I envisage that the CA and PVA structures can be done from PVI. This will expose a single PV with a JSON structure of the PVs (or pairs of PVs) of interest in a Device, with name, description, label, widget, and any other metadata that can’t be got live from EPICS. It is in the progress of being added to areaDetector.

Startup script

These are all brought together in a typical bluesky startup script:

# Running in simulation mode?
SIM_MODE = True

with SignalCollector(), NamedDevices(), TmpFilenameScheme():
    # All Signals with a sim:// prefix or without a prefix will come from this provider
    if SIM_MODE:
        sim = SignalCollector.add_provider(sim=SimProvider(), set_default=True)
    else:
        # Do something like this here
        # ca = SignalCollector.add_provider(ca=CAProvider(), set_default=True)
        pass
    # A PMAC has a trajectory scan interface and 16 Co-ordinate systems
    # which may have motors in them
    pmac1 = pmac.PMAC("BLxxI-MO-PMAC-01:")
    # Raw motors assigned to a single CS, settable for use in step scans
    t1x = motor.MotorDevice(pmac.PMACRawMotor("BLxxI-MO-TABLE-01:X"))
    t1y = motor.MotorDevice(pmac.PMACRawMotor("BLxxI-MO-TABLE-01:Y"))
    t1z = motor.MotorDevice(pmac.PMACRawMotor("BLxxI-MO-TABLE-01:Z"))
    # Simulated detector
    andor_logic = areadetector.AndorLogic(
        areadetector.DetectorDriver("BLxxI-EA-DET-01:DRV"),
        areadetector.HDFWriter("BLxxI-EA-DET-01:HDF5"),
    )
    andor = detector.DetectorDevice(andor_logic)
    # Define a flyscan that can move any combination of these 3 motors which
    # are required to be in the same CS on the pmac
    mapping = fly.FlyDevice([andor], fly.PMACMasterFlyLogic(pmac1, [t1x, t1y, t1z]))
    # Signals are connected (in a blocking way) at the end of the with block
    # and all the Devices in locals() have their names filled in

# Fill in the simulated logic
if SIM_MODE:
    pmac_sim.sim_trajectory_logic(sim, pmac1.traj, a=t1x, b=t1y)
    for m in (t1x, t1y, t1z):
        motor_sim.sim_motor_logic(sim, m)
    areadetector_sim.sim_detector_logic(
        sim, andor_logic.driver, andor_logic.hdf, t1x, t1y
    )


There are a number of Context Managers active while the Devices are defined, which follow a design pattern of defining an instance that can be stored and interacted with for the duration of the with statement:

  • SignalCollector lets SignalProviders be registered with optional transport prefixes like ca:// to allow signal prefixes to be routed to the correct provider. The HasSignals baseclass uses this to fill in the correct Signals
  • NamedDevices sets device.name to its name as defined in locals()
  • TmpFilenameScheme() is a concrete filename scheme to allow file_prefixes to be created for each scan according to a local scheme

After the definitions, we have a number of bits of simulated logic being added. The advantage of the interface classes is that simulations can be written at a Signal level rather than having to reimplement any logic. For instance a PMAC trajectory scan:

def sim_trajectory_logic(p: SimProvider, traj: PMACTrajectory, **motors: MotorDevice):
    """Just enough of a sim to make points_scanned tick at the right rate"""
    stopping = asyncio.Event()
    times: List[float] = []
    positions: Dict[str, List[float]] = {}

    for cs_axis, motor in motors.items():
        assert cs_axis in CS_AXES, f"{cs_axis} should be one of {CS_AXES}"
        assert isinstance(motor.motor, PMACRawMotor), motor.motor
        p.set_value(motor.motor.cs_axis, cs_axis.upper())

    @p.on_call(traj.abort)
    async def do_abort():
        stopping.set()
        times.clear()
        positions.clear()

    @p.on_call(traj.build)
    @p.on_call(traj.append)
    async def do_build_append():
        for t in p.get_value(traj.times):
            times.append(t)
        for cs_axis, ps in p.get_value(traj.positions).items():
            for pp in ps:
                positions.setdefault(cs_axis, []).append(pp)
        p.set_value(traj.build_status, "Success")
        p.set_value(traj.append_status, "Success")

    @p.on_call(traj.execute)
    async def do_scan():
        # Do a fake scan that takes the right time
        stopping.clear()
        status = "Success"
        for i, t in enumerate(times):
            for cs_axis, use in p.get_value(traj.use).items():
                if use and cs_axis in motors:
                    p.set_value(motors[cs_axis].motor.readback, positions[cs_axis][i])
            try:
                # See if we got told to stop
                await asyncio.wait_for(stopping.wait(), t)
            except asyncio.TimeoutError:
                # Carry on
                p.set_value(traj.points_scanned, i + 1)
            else:
                # Stop
                status = "Aborted"
                break
        times.clear()
        positions.clear()
        p.set_value(traj.execute_status, status)

Plan

We need a special plan to flyscan that does a periodic collect to flush the graph:

def grid_fly(flyer, y, ystart, ystop, ynum, x, xstart, xstop, xnum):
    generator = CompoundGenerator(
        generators=[
            LineGenerator(y.name, "mm", ystart, ystop, ynum),
            LineGenerator(x.name, "mm", xstart, xstop, xnum),
        ],
        duration=0.1,
    )
    mapping.configure(dict(generator=generator))
    md = dict(
        hints=dict(
            gridding="rectilinear",
            dimensions=[([y.name], "primary"), ([x.name], "primary")],
        ),
        shape=(ynum, xnum),
        extents=([ystart, ystop], [xstart, xstop]),
    )
    uid = yield from bps.open_run(md)
    yield from bps.kickoff(flyer, wait=True)
    yield from bps.collect(flyer, stream=True)
    yield from bps.checkpoint()
    yield from bps.complete(flyer, group="flyer")
    for _ in range(int(ynum * xnum * 0.1)):
        yield from bps.sleep(1)
        yield from bps.collect(flyer, stream=True)
        yield from bps.checkpoint()
    yield from bps.wait(group="flyer")
    yield from bps.collect(flyer, stream=True)
    yield from bps.close_run()
    return uid

Demo

Running pipenv run ipython -i -- test/startup.py will run first a step scan of a simulated detector (with live plotting), then a flyscan of the same range. Data is written from both, but I can’t get plotting to work from the flyscan yet.

# Run a step scan
RE(bp.grid_scan([andor], t1y, 2, 4, 8, t1x, 3, 5, 10))

# Run a fly scan
RE(grid_fly(mapping, t1y, 2, 4, 8, t1x, 3, 5, 10))