Source code for server.tests.test_stream

import pytest

from server import MJPGStreamer


def _random_image_gen(resolution=(704, 528)):
    import numpy as np
    return (np.random.rand(resolution[1], resolution[0], 3) * 255).astype(np.uint8)


[docs]def test_newest_file_in_folder_generator(tmpdir): second_newest = str(tmpdir) + '/0.txt' newest = str(tmpdir) + '/1.txt' open(second_newest, 'a').close() open(newest, 'a').close() for count, file_ in enumerate(MJPGStreamer.newest_file_in_folder(str(tmpdir)), start=2): assert file_ == second_newest second_newest = newest newest = str(tmpdir) + '/{}.txt'.format(count) open(newest, 'a').close() if count >= 10: break
[docs]def test_jpeg_generator(tmpdir): from io import BytesIO from PIL import Image Image.fromarray(_random_image_gen()).save(str(tmpdir) + '/0.jpg') Image.fromarray(_random_image_gen()).save(str(tmpdir) + '/1.jpg') for count, jpg in enumerate(MJPGStreamer.jpeg_image_generator(str(tmpdir), resolution=(640, 480)), start=2): im = BytesIO(jpg) im = Image.open(im) assert im.format == 'JPEG' assert im.size == (640, 480) Image.fromarray(_random_image_gen()).save(str(tmpdir) + '/{}.jpg'.format(count)) if count >= 10: break
[docs]def test_response(tmpdir): import requests from contextlib import closing from PIL import Image _ = MJPGStreamer(str(tmpdir), port=6002, ip='127.0.0.1') Image.fromarray(_random_image_gen()).save(str(tmpdir) + '/0.jpg') Image.fromarray(_random_image_gen()).save(str(tmpdir) + '/1.jpg') Image.fromarray(_random_image_gen()).save(str(tmpdir) + '/3.jpg') Image.fromarray(_random_image_gen()).save(str(tmpdir) + '/4.jpg') url = 'http://127.0.0.1:{}/stream.mjpg'.format(6002) with closing(requests.get(url, stream=True)) as r: assert r.status_code == 200 if r.encoding is None: r.encoding = 'utf-8' for count, line in enumerate(r.iter_lines(), start=5): if line.startswith(b'\xff\xd8\xff\xe0'): Image.fromarray(_random_image_gen()).save(str(tmpdir) + '/{}.jpg'.format(count)) if count > 10: break else: continue else: pytest.fail('Never got a valid JPG magic number', pytrace=True)