#!/usr/bin/env python3
import bibliopixel
from bibliopixel import Matrix, Rotation
from bibliopixel.drivers.channel_order import ChannelOrder
from bibliopixel.drivers.serial import Serial, LEDTYPE
import anims
import argparse
from colors import *
from datetime import *
import logging
import math
import numpy as np
import requests
import screen
from screen import Screen
import threading
import time
import traceback
try:
import deviceconfig
except ImportError:
import defaultconfig as deviceconfig
class SerialError(Exception):
pass
class ErrorCheckingSerialDriver(Serial):
def _write(self, packet):
if not self._com:
raise SerialError("driver not initialized")
try:
return self._com.write(packet)
except Exception as e:
raise SerialError(e)
def set_brightness(self, b):
if b > deviceconfig.MAX_BRIGHT:
b = deviceconfig.MAX_BRIGHT
super().set_brightness(b)
class Bitpic:
def __init__(self):
self.sid = deviceconfig.SID
print("starting bitpic with screen ID {}".format(self.sid))
self.new_screen = None
self.current_screen = None
self.current_anim = None
self.last_anim_check = None
self.create_driver()
def create_driver(self):
while True:
try:
self.driver = ErrorCheckingSerialDriver(
num = 8*8,
c_order = ChannelOrder.GRB,
ledtype = LEDTYPE.WS2812)
self.led = Matrix(
self.driver,
rotation = Rotation.ROTATE_90,
vert_flip = False,
serpentine = False)
return
except ValueError as e:
print(e)
time.sleep(1)
def write_screen(self, s, retry=3):
self.driver.set_brightness(s.brightness)
for r, row in enumerate(s.colordata):
for c, color in enumerate(row):
color = filter_rgb(color)
self.led.setRGB(c, r, color[0], color[1], color[2])
self.led.push_to_driver()
def fade_between(self, a, b):
if a is None:
self.write_screen(b)
return
fade_time = 5
fps = 60
frame_count = int(math.ceil(fade_time * fps))
frame_time = fade_time / frame_count
a_br = a.brightness
b_br = b.brightness
hsv_a = [[rgb2hsv(rgb) for rgb in row] for row in a.colordata]
hsv_b = [[rgb2hsv(rgb) for rgb in row] for row in b.colordata]
for t in np.linspace(0., 1., frame_count):
br = int(round(a_br + t * (b_br - a_br)))
self.driver.set_brightness(br)
for r in range(len(hsv_a)):
for c in range(len(hsv_a[r])):
a_c = hsv_a[r][c]
b_c = hsv_b[r][c]
hsv_frame = (
(a_c[0] + (b_c[0] - a_c[0]) * t),
(a_c[1] + (b_c[1] - a_c[1]) * t),
(a_c[2] + (b_c[2] - a_c[2]) * t))
rgb_frame = filter_hsv(hsv_frame)
self.led.setRGB(c, r, rgb_frame[0], rgb_frame[1], rgb_frame[2])
self.led.push_to_driver()
time.sleep(frame_time)
def check_for_anim(self, current_anim):
anim_args = {"brightness": 100}
def animtype(t):
if type(current_anim) == t:
return current_anim
return t(**anim_args)
return animtype(anims.IDL)
now = datetime.now()
if now.month == 1 and now.day == 1:
return animtype(anims.NYE)
if now.month == 12 and now.day == 25:
return animtype(anims.XMS)
if now.month == 5 and now.day == 29 and SID == "ajb":
return animtype(anims.AJB)
if now.month == 2 and now.day == 11 and SID == "mnd":
return animtype(anims.JDB)
if now.month == 11 and now.day == 21 and SID == "mnd":
return animtype(anims.KCS)
return None
def load(self, sid=None):
if sid is None:
sid = self.sid
while True:
url = "http://{}/{}".format(screen.HOST, sid)
try:
res = requests.get(url).text
self.last_anim_check = datetime.now()
except requests.ConnectionError as e:
print("failed to fetch, sleeping 10s before retry: {}".format(e))
time.sleep(10)
continue
if res.startswith("anim/"):
self.current_anim = anims.get(self.current_anim, res)
return None
elif res.startswith("redir/"):
sid = res.split('/')[1]
continue
else:
try:
return Screen.decode(res)
except ValueError as e:
print("failed to load screen: {}".format(e))
return None
def anim_loop(self):
current_anim = self.current_anim
current_screen = self.current_screen
new_screen = self.new_screen
if current_anim is not None:
# if we get here, we either didn't check for new data and are in an
# animation, or we did check for new data and found out we're in an
# animation.
anim_screen = current_anim.get_screen()
self.write_screen(anim_screen)
time.sleep(current_anim.desired_frame_time)
elif new_screen is not None:
if new_screen != current_screen:
self.fade_between(current_screen, new_screen)
self.current_screen = new_screen
else:
# we write even if the screen hasn't changed so we can
# detect disconnected displays for reconnection
self.write_screen(current_screen)
time.sleep(0.1)
else:
time.sleep(0.1)
def main(self):
def run_load_loop():
while True:
try:
self.new_screen = self.load()
if self.new_screen is not None:
self.current_anim = None
except Exception:
traceback.print_exc()
time.sleep(5)
load_thread = threading.Thread(target=run_load_loop, daemon=True)
load_thread.start()
try:
while True:
try:
self.anim_loop()
except SerialError:
self.create_driver()
except KeyboardInterrupt:
self.led.all_off()
self.led.push_to_driver()
if __name__ == "__main__":
bp = Bitpic()
bp.main()