Source code for musicdb.lib.stream.gstreamer

# MusicDB,  a music manager with web-bases UI that focus on music.
# Copyright (C) 2018 - 2021  Ralf Stemmer <ralf.stemmer@gmx.net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
GStreamer Pipeline
------------------

Installation of GStreamer Plugins
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

    .. code-block:: bash

        pacman -S gst-plugins-good gst-python gst-plugins-bad
        # -good for mp3
        # -bad  for m4a/aac

Example for Using Plugins
^^^^^^^^^^^^^^^^^^^^^^^^^

    .. code-block:: bash

        gst-launch-1.0 filesrc location=in.m4a ! decodebin ! audioconvert ! lamemp3enc target=1 bitrate=320 cbr=true ! filesink location=out.mp3

"""

import logging
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst

[docs]class GStreamerInterface(object): r""" This class provides a simple abstraction to the GStreamer Python module. The class is made to manage one pipeline that can be executed in a thread. Therefore a state machine is implemented providing the following states. The state represent the state of the pipeline execution process. The current state can be checked by calling :meth:`~GetState` IDLE: After instantiating the state machine is in IDLE state, as well as after the pipeline was completely executed. RUNNING: When the pipeline gets executed, the state machine is in RUNNING state. This happens when the blocking method :meth:`~Execute` got called. This method is thread save an can be executed concurrently. CANCEL: When the state machine is in this state, the RUNNING state will be left. This is state that can be forced by calling :meth:`~Cancel`. ERROR: When an error occures, the state machine goes into the ERROR state. This state will never be left! .. graphviz:: digraph finite_state_machine { size="8,12" node [shape = circle, color=black, fontsize=10, label="IDLE" ] idle; node [shape = circle, color=black, fontsize=10, label="RUNNING" ] running; node [shape = circle, color=black, fontsize=10, label="CANCEL" ] cancel; node [shape = circle, color=black, fontsize=10, label="ERROR" ] error; idle -> running [ label = "GStreamerInterface.Execute()\n/ GStreamer Pipeline State := PLAYING" ]; idle -> error [ label = "Starting GStreamer Pipeline failed" ]; running -> cancel [ label = "GStreamer Error Message" ]; running -> cancel [ label = "GStreamer EOS Message" ]; cancel -> idle [ label = "/ GStreamer Pipeline State := NULL" ]; running -> error [ label = "Tried to start pipeline that was not IDLE" ]; cancel -> error [ label = "Tried to start pipeline that was not IDLE" ]; } These state machine is independent from the `GStreamer Pipeline State Machine <https://gstreamer.freedesktop.org/documentation/design/states.html>`_! Args: pipelinename (str): Optional name for the pipeline """ def __init__(self, pipelinename="pipeline"): self.pipeline = Gst.Pipeline.new(pipelinename) self.state = "IDLE"
[docs] def CreateElement(self, elementname, name): """ This method adds a GStreamer Element to the GStreamer Pipeline. When the element exists and gets added successfully, its instance gets returned. Otherwise ``None`` will be returned. Args: elementname (str): name of the GStreamer Element as it is used by GStreamer name(str): A unique name for the instance of the element that gets added to the Pipeline Returns: ``None`` on error, otherwise the instance of the added GStreamer Element. Example: Creates a new pipeline named ``"example"``. Then two elements from the GStreamer core elements called ``filesrc`` and ``decodebin`` gets added to the Pipeline. As source location the m4a file ``test.m4a`` will be set. Then, the source gets linked to the decoder. .. code-block:: python gstreamer = GStreamerInterface("example") source = gstreamer.CreateElement("filesrc", "source") decoder = gstreamer.CreateElement("decodebin", "decoder") source.set_property("location", "./test.m4a") source.link(decoder) For more details about the ``filesrc`` and ``decodebin`` elements, call the following bash commands: .. code-block:: bash gst-inspect-1.0 filesrc gst-inspect-1.0 decodebin """ # Find element factory = Gst.ElementFactory.find(elementname) if not factory: logging.error("GStreamer Element %s not installed! - Adding %s failed.", elementname, name) return None # Create instance of element element = Gst.ElementFactory.create(factory, name) if not element: logging.error("GStreamer Element %s not created, even though it exists! - Adding %s failed.", elementname, name) return None # Add element to pipeline self.pipeline.add(element) return element
[docs] def GetState(self): """ Returns: The current state of the GStreamer Pipeline execution as string """ return self.state
[docs] def Cancel(self): """ This method cancels the current running execution of the GStreamer Pipeline. It sets the state to CANCEL only when the current state is RUNNING. Otherwise nothing will be changed. """ if self.state == "RUNNING": self.state = "CANCEL"
# TODO Control flow
[docs] def Execute(self): """ Only call this method when the pipeline execution is in IDLE state! Example: Running the pipeline as thread .. code-block:: python gstreamer = GStreamerInterface() # … Setup pipeline … def StartPipelineThread(): # Start GStreamer pipeline gstreamerthread = Thread(target=gstreamer.Execute) gstreamerthread.start() # Wait until the pipeline actually started while True: state = gstreamer.GetState() if state == "RUNNING": return True elif state != "IDLE": print("Unexpected gstreamer state: %s"%(gstate)) return False """ if self.state != "IDLE": logging.error("GStreamer Interface was not in IDLE state but in %s state. Entering ERROR state!", self.state) self.state = "ERROR" return # start playing ret = self.pipeline.set_state(Gst.State.PLAYING) if ret == Gst.StateChangeReturn.FAILURE: logging.error("Unable to set the GStreamer Pipeline to the PLAYING state") self.state = "ERROR" return bus = self.pipeline.get_bus() self.state = "RUNNING" while self.state == "RUNNING": message = bus.timed_pop_filtered(0.1 * Gst.SECOND, Gst.MessageType.ERROR | Gst.MessageType.WARNING | Gst.MessageType.EOS) if message: messagetype = message.type if messagetype == Gst.MessageType.ERROR: error, dbg = message.parse_error() logging.error("GStreamer error for element %s: %s", message.src.get_name(), error.message) if dbg: logging.debug("Detailed information for previous GStreamer error: %s", dbg) self.Cancel() elif messagetype == Gst.MessageType.WARNING: warn, dbg = message.parse_warning() logging.warning("GStreamer warning for element %s: %s", message.src.get_name(), warn.message) if dbg: logging.debug("Detailed information for previous GStreamer warning: %s", dbg) elif messagetype == Gst.MessageType.EOS: self.Cancel() else: # this should never happen logging.warning("Unexpected message received from GStreamer bus!") self.pipeline.set_state(Gst.State.NULL) self.state = "IDLE"
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4