Giter VIP home page Giter VIP logo

membrane_core's People

Contributors

agga1 avatar andpodob avatar balins avatar bartkrak avatar bblaszkow06 avatar bradhanks avatar britton-jb avatar daniel-jodlos avatar djanda97 avatar dominikwolek avatar eboskma avatar felonekonom avatar hajto avatar ixf avatar janpieper-gcx avatar jasonwc avatar mat-hek avatar membraneframeworkadmin avatar mickel8 avatar mkaput avatar mspanc avatar mtsznowak avatar noarkhh avatar nobikik9 avatar qizot avatar rados13 avatar ream88 avatar sax avatar sheldak avatar varsill avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

membrane_core's Issues

Add mechanism for caps reconciliation

Currently we do not have such mechanism, and each element decides which caps to use itself, which of course will not work in some cases. We should consider when (in which playback state) it is to be done.

Current way of declaring children in pipeline makes it impossible to pass parameters

The old way of children declaration with parameters was:

name: {module, options, parameters}

Current way supports only

name: %Module{option: value, ...}

or

name: Module

We should make it possible to do:

name: {%Module{option: value, ...}, parameters}

and

name: {Module, parameters}

and remove support for the old way to avoid ambiguities

Support for different buffer types

Currently we have only one type of buffer (%Membrane.Buffer{} struct). We should consider implementing for each caps a corresponding buffer struct, and move there some metadata that is held in caps by now (such as some properties of mpeg frame that change each time)

Consider creating separate callbacks for some notifications

Currently, for example, the pipeline is receiving a notification {:start_of_stream, :input} when the stream starts.

Originally the messages, now called notifications were created to create the way of capturing messages from the elements. They're by design highly customizable by an ability to pass an arbitrary structure to a notification, as we're not able to predict all possible elements and their use cases.

The Core's callbacks are kind of reverse are well defined and do not change depending on the actual element being used.

IMO we should add new callbacks such as handle_start_of_stream for all cases of notifications currently issued by non-elements.

RC on playback state change when dynamically spawning children

Consider the following scenario:

  1. pipeline changes playback state to playing
  2. new child X is spawned
  3. pipeline starts switching to prepared
  4. all children receive request to go to prepared, besides X, who's changing to playing and does not receive message yet
  5. X goes from stopped to prepared and notifies the pipeline, all other children go from playing to prepared and also notify the pipeline
  6. pipeline finishes the change to prepared
  7. X switches to playing, because its target state has not been updated yet
  8. X finally receives the request that others received at 4.
  9. X goes to prepared

In points 7. and 8. X is in playing, and the pipeline is in prepared, which is unwanted. Although this does not seem to be a serious bug, it may cause problems. Informimg pipeline of both new and target playback state might be a fix.

Demand units should be taken from caps

For example, demand of size k should result in:

  • demanding k MPEG frames on pad with MPEG caps
  • demanding k*channels_no samples on pad with raw caps
  • demanding k bytes on pad with bytestream caps (that need to be created)

To make this usable, we should also add some demand converting functionality - each caps should export function converting k-element demand to equivalent size in bytes (possibly approximate)

Caps changing dynamically may introduce some problems to this approach, so place, time and way of converting sizes need to be carrefully chosen. We should also keep in mind that caps may not be present when the first demand is executed.

Demands in bytes may also need to be supported even with caps different than bytestream, for example when ringbuffer has fixed size such way would be convenient.

We should also consider making PullBuffer size declaring unit always bytes, to avoid mistakes.

Change approach to code reuse

Current way of sharing code between modules in the core is putting that code inside the __using__ macro. As this approach has some disadvantages, and we should aim to limit amount of generated code, we need to refactor that modules. One of ways is to pass the name of module containing implementation to each function of the abstraction. Some macros may also be used to improve this approach.

Modules to be refactored:

  • Membrane.Element.Manager.Common
  • Membrane.Mixins.CallbackHandler
  • Membrane.Mixins.Playback

Improve `toilet` API and docs

The documentation does not say how to set up toilet mode for pullbuffer. Also, the API should be reconsidered (toilet should not be hidden under pull_buffer key)

Ignore `buffer: {pad, []}` action

Now, this is undefined behaviour. Ignoring it would remove the need for checking in elements whether buffers list is empty or not.

Consider unifying default behaviour of `demand` action and `handle_demand` callback between different element types

Currently:

  • in filter, handle_demand receives always entire demand, and returning demand overrides previous demand
  • in source, handle_demand receives only the new demand, except for being caused by redemand action, in which case it receives entire demand
  • in sink, returning demand means adding it to existing demand

Imho we should change this, so that:

  • entire demand is always passed to handle_demand
  • returning demand always causes overriding existing demand

As in some cases different behaviour is required, we could:

  • support passing lambda as a size to demand, such as demand: {:sink, & &1+1}
  • pass the new demand to handle_demand in context

If element does invalid action warning messages are not displayed

Today I tried to use invalid element (that issues demand in handle_preparee)

defmodule Membrane.Element.Fake.Sink.Bytes do
  use Membrane.Element.Base.Sink
  def_known_sink_pads %{
    :sink => {:always, {:pull, demand_in: :bytes}, :any}
  }

  def handle_init(_) do
    {:ok, nil}
  end

  def handle_prepare(_, _) do
    {{:ok, demand: {:sink, 65535}}, nil}
  end

  def handle_write1(:sink, %Membrane.Buffer{payload: payload}, _, _) do
    {{:ok, demand: {:sink, byte_size(payload)}}, nil}
  end
end

I got the following error:

** (EXIT from #PID<0.266.0>) shell process exited with reason: {:error, {:cannot_handle_message, [message: {:membrane_change_playback_state, :prepared}, mode: :info, reason: {:error_handling_actions, {:cannot_handle_demand, [playback_state: :stopped, callback: :handle_prepare]}}]}}

Interactive Elixir (1.6.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
22:58:15.463 [error] GenServer #PID<0.273.0> terminating
** (stop) {:error, {:cannot_handle_message, [message: {:membrane_change_playback_state, :prepared}, mode: :info, reason: {:error_handling_actions, {:cannot_handle_demand, [playback_state: :stopped, callback: :handle_prepare]}}]}}
Last message: {:membrane_change_playback_state, :prepared}
State: %Membrane.Element.Manager.State{controlling_pid: #PID<0.268.0>, internal_state: %Membrane.Element.Manager.State{controlling_pid: #PID<0.268.0>, internal_state: nil, message_bus: #PID<0.268.0>, module: Membrane.Element.Fake.Sink.Bytes, name: :sink, pads: %{data: %{sink: %{accepted_caps: :any, buffer: %Membrane.PullBuffer{current_size: 0, demand: 0, init_size: 0, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 7500, name: :sink, preferred_size: 30000, q: #Qex<[]>, sink: {#PID<0.272.0>, :source}, sink_name: :sink, toilet: false}, caps: nil, direction: :sink, eos: false, is_dynamic: false, mode: :pull, name: :sink, options: %{demand_in: :bytes}, other_name: :source, pid: #PID<0.272.0>, self_demand: 0, sos: false, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Mixins.Playback{async_state_change: false, pending_state: nil, state: :stopped, target_locked?: false, target_state: :prepared}, playback_buffer: %Membrane.Element.Manager.PlaybackBuffer{q: #Qex<[]>}}, message_bus: #PID<0.268.0>, module: Membrane.Element.Fake.Sink.Bytes, name: :sink, pads: %{data: %{sink: %{accepted_caps: :any, buffer: %Membrane.PullBuffer{current_size: 0, demand: 0, init_size: 0, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 7500, name: :sink, preferred_size: 30000, q: #Qex<[]>, sink: {#PID<0.272.0>, :source}, sink_name: :sink, toilet: false}, caps: nil, direction: :sink, eos: false, is_dynamic: false, mode: :pull, name: :sink, options: %{demand_in: :bytes}, other_name: :source, pid: #PID<0.272.0>, self_demand: 0, sos: false, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Mixins.Playback{async_state_change: false, pending_state: nil, state: :stopped, target_locked?: false, target_state: :prepared}, playback_buffer: %Membrane.Element.Manager.PlaybackBuffer{q: #Qex<[]>}}

but I expected to see dev-friendly message defined in https://github.com/membraneframework/membrane-core/blob/master/lib/membrane_core/element/manager/action.ex#L84

PullBuffer should not send demand itself

Instead it should return demand size, and demand should be sent from element. This can improve PullBuffer to be written in more functional way, as it should be.

Display nicer message if links configuration is invalid in pipeline's spec

The current message is not really self-explanatory if you're pointing to a non-existent element while declaring links for pipeline spec.

* (EXIT from #PID<0.470.0>) shell process exited with reason: an exception was raised:
   ** (WithClauseError) no with clause matching: {{:error, {:resolve_link, %{element: :seeker, pad: :sink}, {:unknown_child, :seeker}}}, %Membrane.Pipeline.State{children_ids: %{}, children_to_pids: %{mad: #PID<0.474.0>, parser: #PID<0.475.0>, peakmeter: #PID<0.476.0>, sink: #PID<0.477.0>, source: #PID<0.478.0>}, internal_state: %{}, module: Twist.AutoAudioTrim.Trimmer.Pipeline, pending_pids: nil, pids_to_children: %{#PID<0.474.0> => :mad, #PID<0.475.0> => :parser, #PID<0.476.0> => :peakmeter, #PID<0.477.0> => :sink, #PID<0.478.0> => :source}, playback: %Membrane.Mixins.Playback{async_state_change: false, pending_state: nil, state: :stopped, target_locked?: false, target_state: :stopped}, terminating?: false}}
       (membrane_core) lib/membrane_core/pipeline.ex:192: Membrane.Pipeline.handle_spec/2
       (membrane_core) lib/membrane_core/pipeline.ex:451: Membrane.Pipeline.handle_info/2
       (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
       (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
       (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

Interactive Elixir (1.6.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(2)>
22:54:26.922 [error] GenServer #PID<0.472.0> terminating
** (WithClauseError) no with clause matching: {{:error, {:resolve_link, %{element: :seeker, pad: :sink}, {:unknown_child, :seeker}}}, %Membrane.Pipeline.State{children_ids: %{}, children_to_pids: %{mad: #PID<0.474.0>, parser: #PID<0.475.0>, peakmeter: #PID<0.476.0>, sink: #PID<0.477.0>, source: #PID<0.478.0>}, internal_state: %{}, module: Twist.AutoAudioTrim.Trimmer.Pipeline, pending_pids: nil, pids_to_children: %{#PID<0.474.0> => :mad, #PID<0.475.0> => :parser, #PID<0.476.0> => :peakmeter, #PID<0.477.0> => :sink, #PID<0.478.0> => :source}, playback: %Membrane.Mixins.Playback{async_state_change: false, pending_state: nil, state: :stopped, target_locked?: false, target_state: :stopped}, terminating?: false}}
   (membrane_core) lib/membrane_core/pipeline.ex:192: Membrane.Pipeline.handle_spec/2
   (membrane_core) lib/membrane_core/pipeline.ex:451: Membrane.Pipeline.handle_info/2
   (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
   (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
   (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: [:membrane_pipeline_spec, %Membrane.Pipeline.Spec{children: %{mad: Membrane.Element.Mad.Decoder, parser: Membrane.Element.MPEGAudioParse.Parser, peakmeter: {Membrane.Element.Audiometer.Peakmeter, %Membrane.Element.Audiometer.Peakmeter.Options{period: 100}}, sink: Membrane.Element.Fake.Sink.Bytes, source: {Membrane.Element.HTTPoison.Source, %Membrane.Element.HTTPoison.Source.Options{body: "", headers: [Connection: "keep-alive", Accept: "audio/mpeg", "User-Agent": "RadioKit Twist: AutoAudioTrim"], location: "http://localhost:8000/variant.mp3", method: :get, options: nil}}}, links: %{{:mad, :source} => {:seeker, :sink, [pull_buffer: [preferred_size: 30000]]}, {:parser, :source} => {:mad, :sink, [pull_buffer: [preferred_size: 30000]]}, {:peakmeter, :source} => {:sink, :sink, [pull_buffer: [preferred_size: 30000]]}, {:source, :source} => {:parser, :sink, [pull_buffer: [preferred_size: 30000]]}}}]
State: %Membrane.Pipeline.State{children_ids: %{}, children_to_pids: %{}, internal_state: %{}, module: Twist.AutoAudioTrim.Trimmer.Pipeline, pending_pids: nil, pids_to_children: %{}, playback: %Membrane.Mixins.Playback{async_state_change: false, pending_state: nil, state: :stopped, target_locked?: false, target_state: :stopped}, terminating?: false}

Consider making PullBuffer size change dynamically (especially on startup)

Currently PullBuffer size is configurable but fixed during playback. We should consider if it helps to adjust it basing on current demand. This is most important on change to playing state, because huge demands occur then, what might affect stream fluency and cause significant processor usage at the moment.

Consider fixing convention for :ok and :error

Now we're using

{{:ok, %Spec{children: children, links: links}}, %{}}

but erlang way is

{:ok, {%Spec{children: children, links: links}}, %{}}

it will probably also allow to do slightly faster matching as you don't have to go into two tuples to check if result was ok

This pattern is spread accross the whole framework and maybe that's last moment to fix this.

@mat-hek @bblaszkow06 @mtsznowak please comment on this

Add API for defining custom element callbacks

We need sort of callbacks in elements that are made in uniform fashion.

Use cases:

  • imagine you have an element that is a sink that is a HTTPS server (think: icecast replacement). It receives a SSL-enabled request and during handshake it has to pick up right SSL certificate based on the domain received via SNI. It has to be synchronously called back.

  • imagine you have an element that is a source to JACK and you want to have custom handler on JACK xrun (underrun)

I would like to have an API similar to def_options that specifies all possible callbacks.

Change pad declarations

  • dynamic pads:
{{:dynamic, pad_name}, {availability, mode, caps}}

to

pad_name: {:on_request, mode, caps}
  • demand unit in sinks:
sink_pad_name:  {availability, {:pull, demand_in: :buffers}, caps}

to

sink_pad_name:  [availability: :cos, caps: adkald, mode: :pull, demand_in: :buffers]

Error when `toilet` receives too many buffers is not handled properly

When pipeline is stopped because of toilet buffer overflow, the error is not handled properly and pipeline fails with Function clause error in Bunch:

2018-12-05T15:43:58.631636Z [warn] [core] PullBuffer :sink (toilet): received 288000 buffers,
                                                                                                     which is above fail_level, from input :output that works in push mode.
                                To have control over amount of buffers being produced, consider using push mode.
                                                                                                                If this is a normal situation, increase toilet warn/fail level.
                                    Buffers: %Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}
                                             PullBuffer %Membrane.Core.PullBuffer{current_size: 288000, demand: 65536, demand_pid: #PID<0.305.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :sink, preferred_size: 65536, q: #Qex<[{:buffers, [%Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}], 288000}]>, toilet: %{fail: 262144, warn: 131072}}

2018-12-05T15:43:58.634813Z [warn] [core] Encountered an error.
                                                               Reason: {:pull_buffer, [toilet: :too_many_buffers]}
                                                                                                                  PullBuffer :sink (toilet): failing: too many buffers
                           Stacktrace:
                                          (membrane_core) lib/membrane/core/pull_buffer.ex:139: Membrane.Core.PullBuffer.store/3
                                                                                                                                    (membrane_core) lib/membrane/core/element/buffer_controller.ex:79: anonymous fn/2 in Membrane.Core.Element.BufferController.handle_buffer_pull/3
                                                                                                                                             (elixir) lib/map.ex:791: Map.get_and_update/3
                                                   (elixir) lib/access.ex:370: Access.get_and_update/3
                                                                                                          (elixir) lib/map.ex:791: Map.get_and_update/3
                (elixir) lib/access.ex:370: Access.get_and_update/3


2018-12-05T15:43:58.636849Z [warn] [core] PullBuffer :sink (toilet): received 288000 buffers,
                                                                                             which is above fail_level, from input :output that works in push mode.
                        To have control over amount of buffers being produced, consider using push mode.
                                                                                                        If this is a normal situation, increase toilet warn/fail level.
                            Buffers: %Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}
                                     PullBuffer %Membrane.Core.PullBuffer{current_size: 288000, demand: 65536, demand_pid: #PID<0.305.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :sink, preferred_size: 65536, q: #Qex<[{:buffers, [%Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}], 288000}]>, toilet: %{fail: 262144, warn: 131072}}

2018-12-05T15:43:58.639087Z [warn] [core] Encountered an error.
                                                               Reason: {:pull_buffer, [toilet: :too_many_buffers]}
                                                                                                                  PullBuffer :sink (toilet): failing: too many buffers
                           Stacktrace:
                                          (membrane_core) lib/membrane/core/pull_buffer.ex:139: Membrane.Core.PullBuffer.store/3
                                                                                                                                    (elixir) lib/map.ex:791: Map.get_and_update/3
                                          (elixir) lib/access.ex:370: Access.get_and_update/3
                                                                                                 (elixir) lib/map.ex:791: Map.get_and_update/3
       (elixir) lib/access.ex:370: Access.get_and_update/3
                                                              (membrane_core) lib/membrane/core/element/buffer_controller.ex:72: Membrane.Core.Element.BufferController.handle_buffer_pull/3


2018-12-05T15:43:58.647542Z [warn] [sink core] Encountered an error.
                                                                    Reason: {:function_clause, [{Bunch, :stateful_try_with_status, [error: %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: {:pull_buffer, [toilet: :too_many_buffers]}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: true, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}], [file: 'lib/bunch.ex', line: 158]}, {Membrane.Core.Element.MessageDispatcher, :handle_message, 3, [file: 'lib/membrane/core/element/message_dispatcher.ex', line: 20]}, {Membrane.Element, :handle_info, 2, [file: 'lib/membrane/element.ex', line: 270]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 637]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 711]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}]}
                                                                                                                Element :sink: Terminating: Attempt to terminate element when it is not stopped

                                                    state: %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: %Membrane.Core.PullBuffer{current_size: 0, demand: 65536, demand_pid: #PID<0.305.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :sink, preferred_size: 65536, q: #Qex<[]>, toilet: %{fail: 262144, warn: 131072}}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: false, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}
                                                           Stacktrace:
                                                                          (membrane_core) lib/membrane/core/element/lifecycle_controller.ex:72: Membrane.Core.Element.LifecycleController.handle_shutdown/2
                                                                    (membrane_core) lib/membrane/core/element/message_dispatcher.ex:20: Membrane.Core.Element.MessageDispatcher.handle_message/3
                                                         (membrane_core) lib/membrane/element.ex:251: Membrane.Element.terminate/2
                                                                                                                                      (stdlib) gen_server.erl:673: :gen_server.try_terminate/3
                                                       (stdlib) gen_server.erl:858: :gen_server.terminate/10
                                                                                                                (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3



17:05:28.408 [error] GenServer #PID<0.306.0> terminating
** (FunctionClauseError) no function clause matching in Bunch.stateful_try_with_status/1
    (bunch) lib/bunch.ex:158: Bunch.stateful_try_with_status({:error, %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: {:pull_buffer, [toilet: :too_many_buffers]}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: true, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}})
    (membrane_core) lib/membrane/core/element/message_dispatcher.ex:20: Membrane.Core.Element.MessageDispatcher.handle_message/3 
    (membrane_core) lib/membrane/element.ex:270: Membrane.Element.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {Membrane.Core.Message, :buffer, [[%Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}], :input]}
State: %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: %Membrane.Core.PullBuffer{current_size: 0, demand: 65536, demand_pid: #PID<0.305.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :sink, preferred_size: 65536, q: #Qex<[]>, toilet: %{fail: 262144, warn: 131072}}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: false, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}
2018-12-05T15:43:58.664980Z [warn] [mixer core] Encountered an error.
                                                                     Reason: {:invalid_action, [action: {:demand, {{:dynamic, :input, 0}, :self, 177960}}, callback: :handle_event, module: Membrane.Element.LiveAudioMixer.Source]}
                                                                                         Element :mixer: Elements' Membrane.Element.LiveAudioMixer.Source :handle_event callback returned
                                              invalid action: {:demand, {{:dynamic, :input, 0}, :self, 177960}}. For possible actions are check types
          in Membrane.Element.Action module. Keep in mind that some actions are
                                                                               available in different formats or unavailable for some callbacks,
     element types, playback states or under some other conditions.

                                                                   state: %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{caps: %Membrane.Caps.Audio.Raw{channels: 2, format: :s16le, sample_rate: 48000}, delay: 500000000, interval: 1000000000, playing: true, sinks: %{{:dynamic, :input, 0} => %{eos: false, queue: "", skip: 0}}, start_playing_time: -576457814235974775, tick: 1, timer_ref: #Reference<0.2486223907.3552837633.128386>}, module: Membrane.Element.LiveAudioMixer.Source, name: :mixer, pads: %{data: %{:output => %Membrane.Element.Pad.Data{accepted_caps: Membrane.Caps.Audio.Raw, availability: :always, buffer: nil, caps: %Membrane.Caps.Audio.Raw{channels: 2, format: :s16le, sample_rate: 48000}, current_id: nil, demand: nil, demand_unit: nil, direction: :output, end_of_stream?: false, mode: :push, other_demand_unit: :bytes, other_ref: :input, pid: #PID<0.306.0>, start_of_stream?: false, sticky_messages: nil}, {:dynamic, :input, 0} => %Membrane.Element.Pad.Data{accepted_caps: Membrane.Caps.Audio.Raw, availability: :on_request, buffer: %Membrane.Core.PullBuffer{current_size: 0, demand: 0, demand_pid: #PID<0.304.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :mixer, preferred_size: 65536, q: #Qex<[]>, toilet: false}, caps: %Membrane.Caps.Audio.Raw{channels: 2, format: :s16le, sample_rate: 48000}, current_id: 1, demand: 0, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.304.0>, start_of_stream?: true, sticky_messages: []}}, dynamic_currently_linking: [], info: %{input: %{accepted_caps: Membrane.Caps.Audio.Raw, availability: :on_request, current_id: 1, demand_unit: :bytes, direction: :input, mode: :pull}}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :filter, watcher: #PID<0.300.0>}
                                                                                                                       Stacktrace:
                                                                                                                                      (membrane_core) lib/membrane/core/element/action_handler.ex:22: Membrane.Core.Element.ActionHandler.handle_action/4
                                                                                                                  (bunch) lib/bunch/enum.ex:121: anonymous fn/3 in Bunch.Enum.try_reduce/3
                                                   (elixir) lib/enum.ex:3281: Enumerable.List.reduce/3
                                                                                                          (elixir) lib/enum.ex:1968: Enum.reduce_while/3
                 (membrane_core) lib/membrane/core/callback_handler.ex:127: Membrane.Core.CallbackHandler.exec_handle_actions/5
                                                                                                                                   (membrane_core) lib/membrane/core/element/event_controller.ex:40: Membrane.Core.Element.EventController.exec_handle_event/4


2018-12-05T15:43:58.665156Z [warn] [core] Error while handling actions returned by callback :handle_event

** (EXIT from #PID<0.271.0>) shell process exited with reason: an exception was raised:
    ** (FunctionClauseError) no function clause matching in Bunch.stateful_try_with_status/1
        (bunch) lib/bunch.ex:158: Bunch.stateful_try_with_status({:error, %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: {:pull_buffer, [toilet: :too_many_buffers]}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: true, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}})
        (membrane_core) lib/membrane/core/element/message_dispatcher.ex:20: Membrane.Core.Element.MessageDispatcher.handle_message/3
        (membrane_core) lib/membrane/element.ex:270: Membrane.Element.handle_info/2
        (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
        (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
        (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3

here's the pipeline causing error:

defmodule Membrane.Demo.MP3.Pipeline do
  use Membrane.Pipeline
  def handle_init(path_to_mp3) do
    caps = %Membrane.Caps.Audio.Raw{
          format: :s16le,
          sample_rate: 48_000,
          channels: 2
        }

    children = [
      file_src: %Membrane.Element.File.Source{location: path_to_mp3},
      decoder: Membrane.Element.Mad.Decoder,
      converter: %Membrane.Element.FFmpeg.SWResample.Converter{
        output_caps: caps
      },
      mixer: %Membrane.Element.LiveAudioMixer.Source{caps: caps},
      sink: Membrane.Element.PortAudio.Sink
    ]

    links = %{
      {:file_src, :output} => {:decoder, :input},
      {:decoder, :output} => {:converter, :input},
      {:converter, :output} => {:mixer, :input},
      {:mixer, :output} => {:sink, :input, pull_buffer: [toilet: true]}
    }

    spec = %Membrane.Pipeline.Spec{
      children: children,
      links: links
    }

    {{:ok, spec}, %{}}
  end
end

Add metadata structs instead of free form metadata

Currently elements can add some metadata.

We should add mechanism of traits that allow to store "history of a buffer", so e.g. if MP3 frame comes from HTTP source it would be useful to access HTTP headers even if it was decoded.

It should accumulate somehow over passing the pipeline.

Support for different payload types

Currently we use only one payload type - erlang binary. We should support also a native C payload type, mutable and passed by pointer. This is necessary for processing large amount of data (such as high quality video).
We should consider:

  • Way of passing pointer to different NIFs
  • Monitoring lifecycle of payload
  • Adding payload type to caps
  • Size of each payload type should be accessible from elixir with possibly low overhead, because its used frequently to handle demand-related stuff.
  • Payload should be automatically converted by core if type is not supported by element.

Add some element error handling mechanism in pipeline

Currently errors returned from callbacks always cause failure of pipeline, and cannot be handled in any way. We should provide a mechanism that could make it possible recover such errors at pipeline level. The potential problem with this solution is that it will probably require calls from element to pipeline, while such calls in opposite direction happen to be made, what may lead to deadlocks.

Add `paused` playback state

For two reasons:

  • to enable pausing and resuming stream when #55 is fixed
  • to minimize latency when switching to playing state

Refactor `Membrane.Element.Manager` modules

Currently we have manager module per element type, while functionality of each element is very similar. We could try to define managers as handlers of particular messages/actions, such as caps, events, demands, buffers etc. This is related to refactor of Membrane.Element.Manager.Common (#48)

Add sort of bins

We need a new type of element, something like GStreamer's bin but not the same. They should be located somewhere in between pipelines and bins.

Potential use case: a decoder that automatically creates elements decoding any known format.

Refactor callbacks signatures

  • handle_prepare:
handle_prepare(:stopped, state)

to

handle_prepare_to_play(state)

similarly handle_prepate_to_stop

  • handle_{write,process} -> handle_*_list, e.g. handle_write -> handle_write_list
  • handle_{write,process}1 -> handle_*, e.g. handle_write1 -> handle_write
  • def_known_sink_pads -> def_sink_pads, def_known_source_pads -> def_source_pads

Add developer documentation to the guide

We need to have a developer documentation that explain:

  • what are all these controllers, dispatchers and loggers,
  • what is the process architecture being used for pipelines,
  • what kind of messages flow between different parts of the system,
  • what is the meaning of fields in the the internal structures,
  • how pulling works,
  • how integration with native code is designed.

Remove init_size option from PullBuffer

There are two main reasons for this:

  • init_size option conflicts with handling sticky events, and resolving this conflict is difficult and would obfuscate a lot of code,
  • dealing with init_size is not really a responsibility of PullBuffer, and holding it there disables one to use this functionality when a pad is in push mode, which potentially could be a case

Instead, a separate buffer for this purpose may be created, or such option may be added to the PlaybackBuffer

Update elements to API changes

This involves:

  • change pad names to in/out #74
  • utilize playback state from contexts #46
  • update demand actions #66
  • change pad declarations #44
  • update callbacks signatures #47
  • add context to callbacks that lack it #45

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.