git.haldean.org bitpic / master runmatrix.py
master

Tree @master (Download .tar.gz)

runmatrix.py @masterraw · history · blame

#!/usr/bin/env python3

import bibliopixel
from bibliopixel import Matrix, Rotation
from bibliopixel.drivers.channel_order import ChannelOrder
from bibliopixel.drivers.serial import Serial, LEDTYPE

import anims
import argparse
from colors import *
from datetime import *
import logging
import math
import numpy as np
import requests
import screen
from screen import Screen
import threading
import time
import traceback

try:
    import deviceconfig
except ImportError:
    import defaultconfig as deviceconfig


class SerialError(Exception):
    pass

class ErrorCheckingSerialDriver(Serial):
    def _write(self, packet):
        if not self._com:
            raise SerialError("driver not initialized")
        try:
            return self._com.write(packet)
        except Exception as e:
            raise SerialError(e)

    def set_brightness(self, b):
        if b > deviceconfig.MAX_BRIGHT:
            b = deviceconfig.MAX_BRIGHT
        super().set_brightness(b)


class Bitpic:
    def __init__(self):
        self.sid = deviceconfig.SID
        print("starting bitpic with screen ID {}".format(self.sid))
        self.new_screen = None
        self.current_screen = None
        self.current_anim = None
        self.last_anim_check = None
        self.create_driver()

    def create_driver(self):
        while True:
            try:
                self.driver = ErrorCheckingSerialDriver(
                    num = 8*8,
                    c_order = ChannelOrder.GRB,
                    ledtype = LEDTYPE.WS2812)
                self.led = Matrix(
                    self.driver,
                    rotation = Rotation.ROTATE_90,
                    vert_flip = False,
                    serpentine = False)
                return
            except ValueError as e:
                print(e)
                time.sleep(1)

    def write_screen(self, s, retry=3):
        self.driver.set_brightness(s.brightness)
        for r, row in enumerate(s.colordata):
            for c, color in enumerate(row):
                color = filter_rgb(color)
                self.led.setRGB(c, r, color[0], color[1], color[2])
        self.led.push_to_driver()

    def fade_between(self, a, b):
        if a is None:
            self.write_screen(b)
            return

        fade_time = 5
        fps = 60
        frame_count = int(math.ceil(fade_time * fps))
        frame_time = fade_time / frame_count
        a_br = a.brightness
        b_br = b.brightness

        hsv_a = [[rgb2hsv(rgb) for rgb in row] for row in a.colordata]
        hsv_b = [[rgb2hsv(rgb) for rgb in row] for row in b.colordata]

        for t in np.linspace(0., 1., frame_count):
            br = int(round(a_br + t * (b_br - a_br)))
            self.driver.set_brightness(br)
            for r in range(len(hsv_a)):
                for c in range(len(hsv_a[r])):
                    a_c = hsv_a[r][c]
                    b_c = hsv_b[r][c]
                    hsv_frame = (
                        (a_c[0] + (b_c[0] - a_c[0]) * t),
                        (a_c[1] + (b_c[1] - a_c[1]) * t),
                        (a_c[2] + (b_c[2] - a_c[2]) * t))
                    rgb_frame = filter_hsv(hsv_frame)
                    self.led.setRGB(c, r, rgb_frame[0], rgb_frame[1], rgb_frame[2])
            self.led.push_to_driver()
            time.sleep(frame_time)

    def check_for_anim(self, current_anim):
        anim_args = {"brightness": 100}
        def animtype(t):
            if type(current_anim) == t:
                return current_anim
            return t(**anim_args)

        return animtype(anims.IDL)

        now = datetime.now()
        if now.month == 1 and now.day == 1:
            return animtype(anims.NYE)
        if now.month == 12 and now.day == 25:
            return animtype(anims.XMS)
        if now.month == 5 and now.day == 29 and SID == "ajb":
            return animtype(anims.AJB)
        if now.month == 2 and now.day == 11 and SID == "mnd":
            return animtype(anims.JDB)
        if now.month == 11 and now.day == 21 and SID == "mnd":
            return animtype(anims.KCS)
        return None

    def load(self, sid=None):
        if sid is None:
            sid = self.sid
        while True:
            url = "http://{}/{}".format(screen.HOST, sid)
            try:
                res = requests.get(url).text
                self.last_anim_check = datetime.now()
            except requests.ConnectionError as e:
                print("failed to fetch, sleeping 10s before retry: {}".format(e))
                time.sleep(10)
                continue
            if res.startswith("anim/"):
                self.current_anim = anims.get(self.current_anim, res)
                return None
            elif res.startswith("redir/"):
                sid = res.split('/')[1]
                continue
            else:
                try:
                    return Screen.decode(res)
                except ValueError as e:
                    print("failed to load screen: {}".format(e))
                    return None

    def anim_loop(self):
        current_anim = self.current_anim
        current_screen = self.current_screen
        new_screen = self.new_screen

        if current_anim is not None:
            # if we get here, we either didn't check for new data and are in an
            # animation, or we did check for new data and found out we're in an
            # animation.
            anim_screen = current_anim.get_screen()
            self.write_screen(anim_screen)
            time.sleep(current_anim.desired_frame_time)
        elif new_screen is not None:
            if new_screen != current_screen:
                self.fade_between(current_screen, new_screen)
                self.current_screen = new_screen
            else:
                # we write even if the screen hasn't changed so we can
                # detect disconnected displays for reconnection
                self.write_screen(current_screen)
            time.sleep(0.1)
        else:
            time.sleep(0.1)

    def main(self):
        def run_load_loop():
            while True:
                try:
                    self.new_screen = self.load()
                    if self.new_screen is not None:
                        self.current_anim = None
                except Exception:
                    traceback.print_exc()
                time.sleep(5)

        load_thread = threading.Thread(target=run_load_loop, daemon=True)
        load_thread.start()
        try:
            while True:
                try:
                    self.anim_loop()
                except SerialError:
                    self.create_driver()
        except KeyboardInterrupt:
            self.led.all_off()
            self.led.push_to_driver()


if __name__ == "__main__":
    bp = Bitpic()
    bp.main()