"""
base_camera_server.py: Holds the Python implementation of a ``CameraServer`` as defined in ``pepi.thrift``.
"""
import logging
import os
import tempfile
from io import BytesIO
import uuid
import collections
from future.utils import viewitems
import atexit
import threading
import time
import typing
from PIL import Image
import numpy as np
from .stream import MJPGStreamer
from .iptools import IPTools
from .pepi_thrift_loader import ImageUnavailable
from .abstract_camera import AbstractCamera
[docs]class CameraTimelapser(threading.Thread):
"""
CameraTimelapser is a utility class designed to call the
``low_res_still()`` method on a concrete ``AbstractCamera``
at a defined rate and save it to a given folder for the
purposes of timelapsing or streaming from the camera.
"""
def __init__(self, camera, folder, interval):
# type: (AbstractCamera, str, typing.Union[float or int]) -> None
super(CameraTimelapser, self).__init__()
self.camera = camera
self.path = folder
self.interval = interval
self.daemon = True
def cleanup(self): # pragma: no cover
self.camera = None
atexit.register(cleanup, self)
[docs] def run(self):
start = time.time()
count = 0
while True:
if time.time() > (start + self.interval):
start = time.time()
image = Image.fromarray(np.array(self.camera.low_res_still(), dtype=np.uint8))
image.save(self.path + '/{}.jpeg'.format(count))
count += 1
else:
time.sleep(self.interval/2)
# noinspection PyMethodMayBeStatic
[docs]class BaseCameraServer(object):
"""
BaseCameraServer is the minimal Python implementation of a CameraServer
as defined in ``pepi.thrift``. CameraServers are used in with the
Apache Thrift protocol to provide RPC mechanisms, allowing control
of this server over RPC, if it is launched with Thrift.
A CameraServer subclassing BaseCameraServer may override any of these
methods to better reflect their use. However, care must be taken to
ensure that the side effects of the subclass's methods do not affect
other methods. For example, if you were to change the capture method
to store images in a list for whatever reason, you would need to change
the image retrieval methods.
A CameraServer's use-case is to provide a server that controls a
number of cameras to be controlled in a consistent manner. This allows
for a client to seamlessly control all implementations of CameraServer's,
over Thrift without needing to concern themselves with what cameras are
attached, the procedure call names, etc.
This BaseCameraServer implementation supports multiple connected cameras,
that are transparent to the connecting client. When retrieving images, a
list of encoded images are returned. The order of this list remains
consistent across procedure calls.
"""
StreamInfo = collections.namedtuple('StreamInfo', 'port, folder, streamer')
STREAM_PORT = 6001
def __init__(self, cameras, stream=True):
# type: ([AbstractCamera], bool) -> None
"""
Initialises the BaseCameraServer.
:param cameras: a list of AbstractCamera objects
:param stream: True to start streams for all cameras, False to not.
"""
# self.cameras = CameraManager(cameras)
self.cameras = cameras
self._stored_captures = dict()
self.streams = dict()
self.identifier = str(uuid.uuid4().hex)
if stream:
StreamInfo = collections.namedtuple('StreamInfo', 'port, folder, streamer, capturer')
for count, camera in enumerate(cameras):
port_ = self.STREAM_PORT + count
folder_ = tempfile.mkdtemp()
streamer_ = MJPGStreamer(folder_, port=port_)
capturer = CameraTimelapser(camera=camera, folder=folder_, interval=0.5)
capturer.start()
self.streams[camera] = StreamInfo(port=port_, folder=folder_, streamer=streamer_, capturer=capturer)
def cleanup(): # pragma: no cover
"""
Cleans up after this server by destroying connected cameras
and their streams, and erasing the stored images.
"""
logging.info('Cleaning up RaspPiCameraServer')
self._stored_captures = None
self.cameras = None
self.streams = None
logging.info('Cleanup complete for RaspPiCameraServer')
atexit.register(cleanup)
[docs] def ping(self):
# type: () -> bool
"""
Ping the server to check if it is active and responding.
:return: True (always)
"""
logging.info('ping()')
return True
[docs] def identify(self):
# type: () -> str
"""
Get the unique identifier of this server.
:return: the server's unique identifier string
"""
logging.info('identify()')
return self.identifier
@staticmethod
def _current_ip():
# type: () -> str
return IPTools.current_ips()[0]
[docs] def stream_urls(self):
# type: () -> [str]
"""
Get the a list of URLs where the MJPG image stream of each camera
connected to this server may be accessed.
The order of the returned images is consistent, e.g. Camera #1, #2
.., #x returned in that order.
:return: a list of the stream URLs as a string
"""
logging.info('stream_urls()')
out_urls = []
for _, stream_info in viewitems(self.streams):
out_urls.append('http://{}:{}/stream.mjpeg'.format(self._current_ip(), stream_info.port))
return out_urls
[docs] def shutdown(self):
# type: () -> None
"""
Shutdown the server (i.e. power-off).
Subclasses may choose to
ignore calls to this function, in which case they should override
this function to do nothing.
:return: None
"""
logging.info('shutdown()')
os.system('shutdown now')
[docs] def start_capture(self, data_code):
# type: (str) -> None
"""
Immediately starts the process of capturing from this server's Camera(s),
and stores the captured data under the given unique data_code.
Note: the received `data_code` is assumed to be unique. Subclasses may
choose to implement better isolation methods, but this is not
guaranteed nor required.
:param data_code: the requested data_code to store the capture under
:return: None
"""
logging.info('start_capture(data_code: {})'.format(data_code))
captures = []
# TODO: parallelize capture from all cameras
for camera in self.cameras:
try:
captures.append(Image.fromarray(np.array(camera.still(), dtype=np.uint8)))
except (AttributeError, TypeError, ValueError) as e:
logging.warn('Could not construct image from received RGB array: {}'.format(e))
continue
if captures:
self._stored_captures[data_code] = captures
logging.info('Stored_captured after start_capture(): {}'.format(self._stored_captures.keys()))
def _retrieve_and_encode_from_stored_captures(self, data_code, encoding, quality):
# type: (str, str, int) -> [str]
try:
image_list = self._stored_captures.pop(data_code)
except KeyError:
raise ImageUnavailable('No images are stored under the data_code "{}"'.format(data_code))
else:
out_strings = []
for image in image_list:
image_buffer = BytesIO()
image.save(image_buffer, encoding, quality=quality)
out_strings.append(image_buffer.getvalue())
return out_strings
[docs] def retrieve_stills_png(self, with_data_code):
# type: (str) -> [str]
"""
Retrieves the images stored under `with_data_code`, if they exist, and
encodes them into a .png str (i.e. bytes).
The order of the returned images is consistent, e.g. Camera #1, #2
.., #x returned in that order.
:param with_data_code: the data_code from which the image will be retrieved
:raises: ImageUnavailable: when image requested with an invalid/unknown data_code
:return: a list of strings with each string containing an encoded as a .png
"""
logging.info('retrieve_stills_png(with_data_code: {})'.format(with_data_code))
return self._retrieve_and_encode_from_stored_captures(with_data_code, 'PNG', quality=3)
[docs] def retrieve_stills_jpg(self, with_data_code):
# type: (str) -> [str]
"""
Retrieves the images stored under `with_data_code`, if they exist, and
encodes them into a .jpg str (i.e. bytes).
The order of the returned images is consistent, e.g. Camera #1, #2
.., #x returned in that order.
:param with_data_code: the data_code from which the image will be retrieved
:raises: ImageUnavailable: when image requested with an invalid/unknown data_code
:return: a list of strings with each string containing an encoded as a .jpg
"""
logging.info('retrieve_stills_jpg(with_data_code: {})'.format(with_data_code))
return self._retrieve_and_encode_from_stored_captures(with_data_code, 'JPEG', quality=85)
[docs] def enumerate_methods(self):
# type: () -> [(str, str)]
"""
Retrieves a map of the methods available on this server. This is useful
for clients to verify the methods it can expect to be able to call
if being called remotely.
:return: dict of <method_name: [arguments]>
"""
import inspect
methods = inspect.getmembers(self, predicate=inspect.ismethod)
output_dict = dict()
for _tuple in methods:
name, pointer = _tuple
args = inspect.getargspec(pointer).args
try:
args.remove('self')
except ValueError: # pragma: no cover
pass
output_dict[name] = args
return output_dict