Welcome to Cowlug


Site Navigation

Home Assistant binary_sensor description

This is my document about the development of a binary_sensor platform for the Home Assistant project.

If you haven't already read the component description, please do so first.

The binary_sensor platform is different from the old one but still fairly simple. We have moved all the callback code into the platform where it makes most sense.



W800rf32 binary_sensor platform code

This is the configuration for our platform, it is from the configuration.yaml file.

Github acceptance note:
You must not include an example of this in your source code.

"""
# Example configuration.yaml entry

binary_sensor:
  - platform: w800rf32
    devices:
      c1:
        name: motion_hall
        off_delay: 5
        device_class: Motion
      a2:
        name: motion_kitchen
        device_class: Motion

"""

Start of source code

"""
Support for w800rf32 binary sensors.

For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/binary_sensor.w800rf32/

Imports

This is a bit more complex than the component code, we have added the imports from homeassistant.components.binary_sensor because our w800rf32BinarySensor class inherits the HA BinarySensor, from homeassistant.core for asyncio callabck and from from homeassistant.helpers.dispatcher for the async_dispatch_connect() function

import logging

import voluptuous as vol

from homeassistant.components.binary_sensor import (
    DEVICE_CLASSES_SCHEMA, PLATFORM_SCHEMA, BinarySensorDevice)
from homeassistant.components.w800rf32 import (W800RF32_DEVICE)
from homeassistant.const import (CONF_DEVICE_CLASS, CONF_NAME, CONF_DEVICES)
from homeassistant.core import callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.dispatcher import (async_dispatcher_connect)

We also require the event and dt code to be imported for event time tracking. We use event time tracking for off_delay in our binary_sensor.

from homeassistant.helpers import event as evt
from homeassistant.util import dt as dt_util

_LOGGER = logging.getLogger(__name__)


Dependencies

This tells the code we depend on the w800rf32 component code.

DEPENDENCIES = ['w800rf32']

CONF_OFF_DELAY needs a default value I suppose we could set a default in the code below.

CONF_OFF_DELAY = 'off_delay'

Configuration

Note: CONF_DEVICES is a Required value and is defined as such, in other words, it must be present in your configuration.yaml file, all the other values are Optional.
    
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
    vol.Required(CONF_DEVICES): {
        cv.string: vol.Schema({
            vol.Optional(CONF_NAME): cv.string,
            vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
            vol.Optional(CONF_OFF_DELAY):
            vol.All(cv.time_period, cv.positive_timedelta)
        })
    },
}, extra=vol.ALLOW_EXTRA)

async_setup_platform()

Called from: homeassistant.setup
Returns:

This is where we setup our platform for HA.


Here is the HA checklist for creating a platform. So lets list what we are doing in our platform:

    1. Define an async_setup_platorm function and pass in the hass object, the config dictionary and the add_entities function.
    2. Import the W800rf32 module
    3. Build a list of entities from config
    4. Add the list to the hass object.

We have already talked about the hass and config objects.
The add_entities function simply registers the entities inside HA. Here. is a description of the entity registry.

async def async_setup_platform(hass, config,
                               add_entities, discovery_info=None):
    """Set up the Binary Sensor platform to w800rf32."""
    binary_sensors = []

config[CONF_DEVICES] is an OrderedDict and with a yaml file as described at the top of this page, it looks like this:

OrderedDict([
   ('c1',
      OrderedDict([('name', 'motion_hall'), ('off_delay', datetime.timedelta(seconds=5)), ('device_class', 'motion')])),
   ('a2',
      OrderedDict([('name', 'motion_kitchen'), ('device_class', 'motion')]))])


    # device_id --> "c1 or a3" X10 device. entity (type dictionary) --> name, device_class etc
    for device_id, entity in config[CONF_DEVICES].items():

        _LOGGER.debug("Add %s w800rf32.binary_sensor (class %s)",
                      entity[CONF_NAME], entity.get(CONF_DEVICE_CLASS))

For each entity, make a w800rf32BinarySensor object, append it to a list then add the
list to the hass object with add_entities().

        device = W800rf32BinarySensor(
            device_id, entity.get(CONF_NAME), entity.get(CONF_DEVICE_CLASS),
            entity.get(CONF_OFF_DELAY))

        binary_sensors.append(device)

    add_entities(binary_sensors)

w800rf32BinarySensor class

Called from:
Returns:

Describe the w800rf32BinarySensor object. This just overrides some of the HA BinarySensor properties and functions to customize it for our sensor object.
w800rf32BinarySensor is used to keep state of each binary sensor and is added in setup_platform().

class W800rf32BinarySensor(BinarySensorDevice):
    """A representation of a w800rf32 binary sensor."""

    def __init__(self, device_id, name, device_class=None, off_delay=None):
        """Initialize the w800rf32 sensor."""
        self._signal = W800RF32_DEVICE.format(device_id)
        self._name = name
        self._device_class = device_class
        self._off_delay = off_delay
        self._state = False
        self._delay_listener = None

_off_delay_listener is a utility fucntion used only in the class instance, denoted by a leading underscore. It sets the state to off or False after the delay timeout.

    @callback
    def _off_delay_listener(self, now):
        """Switch device off after a delay."""
        self._delay_listener = None
        self.update_state(False)

The following three properties, name, should_poll and device_class are all properties of a binary_sensor. These are originally set from the configuration.yaml file.

    @property
    def name(self):
        """Return the device name."""
        return self._name

    @property
    def should_poll(self):
        """No polling needed."""
        return False

    @property
    def device_class(self):
        """Return the sensor class."""
        return self._device_class

is_on returns the current state (on or off) of this particular instance.

    @property
    def is_on(self):
        """Return true if the sensor state is True."""
        return self._state

binary_sensor_update()

Called from: handle_receive() in component
Returns: None

Binary_sensor_update is called for every incoming event. We check to make sure it's a W800rf32 event object then update the state.

    @callback
    def binary_sensor_update(self, event):
        """Call for control updates from the w800rf32 gateway."""
        import W800rf32 as w800rf32mod

        if not isinstance(event, w800rf32mod.W800rf32Event):
            return

        dev_id = event.device
        command = event.command

        _LOGGER.debug(
            "BinarySensor update (Device ID: %s Command %s ...)",
            dev_id, command)

        # Update the w800rf32 device state
        if command in ('On', 'Off'):
            is_on = command == 'On'
            self.update_state(is_on)

evt.async.track_point_in_time() is a function that tracks the event time which allows us to have an off_delay: property for our binary sensor. dt_util.utcnow() is a time math function.

        if (self.is_on and self._off_delay is not None and
                self._delay_listener is None):

            self._delay_listener = evt.async_track_point_in_time(
                self.hass, self._off_delay_listener,
                dt_util.utcnow() + self._off_delay)

update_state sets the state (on or off) of this particular instance.

    def update_state(self, state):
        """Update the state of the device."""
        self._state = state
        self.async_schedule_update_ha_state()

async_added_to_hass() is a function that registers the async_dispatch_connect() call which points to binary_sensor_update(). This is essentially paired with the async_dispatch_send() call which is described in the w800rf32 component.

    async def async_added_to_hass(self):
        """Register update callback."""
        async_dispatcher_connect(self.hass, self._signal,
                                 self.binary_sensor_update)