membraneframework / membrane_core Goto Github PK
View Code? Open in Web Editor NEWThe core of the Membrane Framework, advanced multimedia processing framework
Home Page: https://membrane.stream
License: Apache License 2.0
The core of the Membrane Framework, advanced multimedia processing framework
Home Page: https://membrane.stream
License: Apache License 2.0
In stopped
state pads may simply be not yet linked, so sending any message then causes a RC
Currently we do not have such mechanism, and each element decides which caps to use itself, which of course will not work in some cases. We should consider when (in which playback state) it is to be done.
The old way of children declaration with parameters was:
name: {module, options, parameters}
Current way supports only
name: %Module{option: value, ...}
or
name: Module
We should make it possible to do:
name: {%Module{option: value, ...}, parameters}
and
name: {Module, parameters}
and remove support for the old way to avoid ambiguities
Currently we have only one type of buffer (%Membrane.Buffer{}
struct). We should consider implementing for each caps a corresponding buffer struct, and move there some metadata that is held in caps by now (such as some properties of mpeg frame that change each time)
Currently, for example, the pipeline is receiving a notification {:start_of_stream, :input}
when the stream starts.
Originally the messages, now called notifications were created to create the way of capturing messages from the elements. They're by design highly customizable by an ability to pass an arbitrary structure to a notification, as we're not able to predict all possible elements and their use cases.
The Core's callbacks are kind of reverse are well defined and do not change depending on the actual element being used.
IMO we should add new callbacks such as handle_start_of_stream
for all cases of notifications currently issued by non-elements.
Consider the following scenario:
X
is spawnedX
, who's changing to playing and does not receive message yetX
goes from stopped to prepared and notifies the pipeline, all other children go from playing to prepared and also notify the pipelineX
switches to playing, because its target state has not been updated yetX
finally receives the request that others received at 4.X
goes to preparedIn points 7. and 8. X
is in playing, and the pipeline is in prepared, which is unwanted. Although this does not seem to be a serious bug, it may cause problems. Informimg pipeline of both new and target playback state might be a fix.
Profits:
Please share your propositions if you come up with better names.
paused
state if we add it in the future (#64)For example, demand of size k should result in:
To make this usable, we should also add some demand converting functionality - each caps should export function converting k-element demand to equivalent size in bytes (possibly approximate)
Caps changing dynamically may introduce some problems to this approach, so place, time and way of converting sizes need to be carrefully chosen. We should also keep in mind that caps may not be present when the first demand is executed.
Demands in bytes may also need to be supported even with caps different than bytestream, for example when ringbuffer has fixed size such way would be convenient.
We should also consider making PullBuffer size declaring unit always bytes, to avoid mistakes.
Current way of sharing code between modules in the core is putting that code inside the __using__
macro. As this approach has some disadvantages, and we should aim to limit amount of generated code, we need to refactor that modules. One of ways is to pass the name of module containing implementation to each function of the abstraction. Some macros may also be used to improve this approach.
Modules to be refactored:
The documentation does not say how to set up toilet mode for pullbuffer. Also, the API should be reconsidered (toilet
should not be hidden under pull_buffer
key)
We should allow element to request demands on a pad even if it has already been unlinked, and remove it only when buffers become empty. We should also ensure not to send anything through such pad and to fail when something incomes on it.
Now, this is undefined behaviour. Ignoring it would remove the need for checking in elements whether buffers list is empty or not.
Modules from Membrane Core should be grouped in docs. This can be achieved by editing mix.exs
See the docs
Currently:
handle_demand
receives always entire demand, and returning demand
overrides previous demandhandle_demand
receives only the new demand, except for being caused by redemand
action, in which case it receives entire demanddemand
means adding it to existing demandImho we should change this, so that:
handle_demand
demand
always causes overriding existing demandAs in some cases different behaviour is required, we could:
demand: {:sink, & &1+1}
handle_demand
in contextToday I tried to use invalid element (that issues demand in handle_preparee)
defmodule Membrane.Element.Fake.Sink.Bytes do
use Membrane.Element.Base.Sink
def_known_sink_pads %{
:sink => {:always, {:pull, demand_in: :bytes}, :any}
}
def handle_init(_) do
{:ok, nil}
end
def handle_prepare(_, _) do
{{:ok, demand: {:sink, 65535}}, nil}
end
def handle_write1(:sink, %Membrane.Buffer{payload: payload}, _, _) do
{{:ok, demand: {:sink, byte_size(payload)}}, nil}
end
end
I got the following error:
** (EXIT from #PID<0.266.0>) shell process exited with reason: {:error, {:cannot_handle_message, [message: {:membrane_change_playback_state, :prepared}, mode: :info, reason: {:error_handling_actions, {:cannot_handle_demand, [playback_state: :stopped, callback: :handle_prepare]}}]}}
Interactive Elixir (1.6.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
22:58:15.463 [error] GenServer #PID<0.273.0> terminating
** (stop) {:error, {:cannot_handle_message, [message: {:membrane_change_playback_state, :prepared}, mode: :info, reason: {:error_handling_actions, {:cannot_handle_demand, [playback_state: :stopped, callback: :handle_prepare]}}]}}
Last message: {:membrane_change_playback_state, :prepared}
State: %Membrane.Element.Manager.State{controlling_pid: #PID<0.268.0>, internal_state: %Membrane.Element.Manager.State{controlling_pid: #PID<0.268.0>, internal_state: nil, message_bus: #PID<0.268.0>, module: Membrane.Element.Fake.Sink.Bytes, name: :sink, pads: %{data: %{sink: %{accepted_caps: :any, buffer: %Membrane.PullBuffer{current_size: 0, demand: 0, init_size: 0, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 7500, name: :sink, preferred_size: 30000, q: #Qex<[]>, sink: {#PID<0.272.0>, :source}, sink_name: :sink, toilet: false}, caps: nil, direction: :sink, eos: false, is_dynamic: false, mode: :pull, name: :sink, options: %{demand_in: :bytes}, other_name: :source, pid: #PID<0.272.0>, self_demand: 0, sos: false, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Mixins.Playback{async_state_change: false, pending_state: nil, state: :stopped, target_locked?: false, target_state: :prepared}, playback_buffer: %Membrane.Element.Manager.PlaybackBuffer{q: #Qex<[]>}}, message_bus: #PID<0.268.0>, module: Membrane.Element.Fake.Sink.Bytes, name: :sink, pads: %{data: %{sink: %{accepted_caps: :any, buffer: %Membrane.PullBuffer{current_size: 0, demand: 0, init_size: 0, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 7500, name: :sink, preferred_size: 30000, q: #Qex<[]>, sink: {#PID<0.272.0>, :source}, sink_name: :sink, toilet: false}, caps: nil, direction: :sink, eos: false, is_dynamic: false, mode: :pull, name: :sink, options: %{demand_in: :bytes}, other_name: :source, pid: #PID<0.272.0>, self_demand: 0, sos: false, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Mixins.Playback{async_state_change: false, pending_state: nil, state: :stopped, target_locked?: false, target_state: :prepared}, playback_buffer: %Membrane.Element.Manager.PlaybackBuffer{q: #Qex<[]>}}
but I expected to see dev-friendly message defined in https://github.com/membraneframework/membrane-core/blob/master/lib/membrane_core/element/manager/action.ex#L84
There are numerous locations in the core where we're using structs but we don't need them to have all benefits of structs, maybe using records will be better? They might be slightly faster.
https://groups.google.com/forum/#!msg/elixir-lang-talk/6kn7J2XnFg8/I5poTNCEHwAJ
Instead it should return demand size, and demand should be sent from element. This can improve PullBuffer to be written in more functional way, as it should be.
The current message is not really self-explanatory if you're pointing to a non-existent element while declaring links for pipeline spec.
* (EXIT from #PID<0.470.0>) shell process exited with reason: an exception was raised:
** (WithClauseError) no with clause matching: {{:error, {:resolve_link, %{element: :seeker, pad: :sink}, {:unknown_child, :seeker}}}, %Membrane.Pipeline.State{children_ids: %{}, children_to_pids: %{mad: #PID<0.474.0>, parser: #PID<0.475.0>, peakmeter: #PID<0.476.0>, sink: #PID<0.477.0>, source: #PID<0.478.0>}, internal_state: %{}, module: Twist.AutoAudioTrim.Trimmer.Pipeline, pending_pids: nil, pids_to_children: %{#PID<0.474.0> => :mad, #PID<0.475.0> => :parser, #PID<0.476.0> => :peakmeter, #PID<0.477.0> => :sink, #PID<0.478.0> => :source}, playback: %Membrane.Mixins.Playback{async_state_change: false, pending_state: nil, state: :stopped, target_locked?: false, target_state: :stopped}, terminating?: false}}
(membrane_core) lib/membrane_core/pipeline.ex:192: Membrane.Pipeline.handle_spec/2
(membrane_core) lib/membrane_core/pipeline.ex:451: Membrane.Pipeline.handle_info/2
(stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:686: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Interactive Elixir (1.6.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(2)>
22:54:26.922 [error] GenServer #PID<0.472.0> terminating
** (WithClauseError) no with clause matching: {{:error, {:resolve_link, %{element: :seeker, pad: :sink}, {:unknown_child, :seeker}}}, %Membrane.Pipeline.State{children_ids: %{}, children_to_pids: %{mad: #PID<0.474.0>, parser: #PID<0.475.0>, peakmeter: #PID<0.476.0>, sink: #PID<0.477.0>, source: #PID<0.478.0>}, internal_state: %{}, module: Twist.AutoAudioTrim.Trimmer.Pipeline, pending_pids: nil, pids_to_children: %{#PID<0.474.0> => :mad, #PID<0.475.0> => :parser, #PID<0.476.0> => :peakmeter, #PID<0.477.0> => :sink, #PID<0.478.0> => :source}, playback: %Membrane.Mixins.Playback{async_state_change: false, pending_state: nil, state: :stopped, target_locked?: false, target_state: :stopped}, terminating?: false}}
(membrane_core) lib/membrane_core/pipeline.ex:192: Membrane.Pipeline.handle_spec/2
(membrane_core) lib/membrane_core/pipeline.ex:451: Membrane.Pipeline.handle_info/2
(stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:686: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: [:membrane_pipeline_spec, %Membrane.Pipeline.Spec{children: %{mad: Membrane.Element.Mad.Decoder, parser: Membrane.Element.MPEGAudioParse.Parser, peakmeter: {Membrane.Element.Audiometer.Peakmeter, %Membrane.Element.Audiometer.Peakmeter.Options{period: 100}}, sink: Membrane.Element.Fake.Sink.Bytes, source: {Membrane.Element.HTTPoison.Source, %Membrane.Element.HTTPoison.Source.Options{body: "", headers: [Connection: "keep-alive", Accept: "audio/mpeg", "User-Agent": "RadioKit Twist: AutoAudioTrim"], location: "http://localhost:8000/variant.mp3", method: :get, options: nil}}}, links: %{{:mad, :source} => {:seeker, :sink, [pull_buffer: [preferred_size: 30000]]}, {:parser, :source} => {:mad, :sink, [pull_buffer: [preferred_size: 30000]]}, {:peakmeter, :source} => {:sink, :sink, [pull_buffer: [preferred_size: 30000]]}, {:source, :source} => {:parser, :sink, [pull_buffer: [preferred_size: 30000]]}}}]
State: %Membrane.Pipeline.State{children_ids: %{}, children_to_pids: %{}, internal_state: %{}, module: Twist.AutoAudioTrim.Trimmer.Pipeline, pending_pids: nil, pids_to_children: %{}, playback: %Membrane.Mixins.Playback{async_state_change: false, pending_state: nil, state: :stopped, target_locked?: false, target_state: :stopped}, terminating?: false}
At least add props_t
type that will be referenced from Membrane.Pipeline.Spec
Currently PullBuffer size is configurable but fixed during playback. We should consider if it helps to adjust it basing on current demand. This is most important on change to playing
state, because huge demands occur then, what might affect stream fluency and cause significant processor usage at the moment.
Now we're using
{{:ok, %Spec{children: children, links: links}}, %{}}
but erlang way is
{:ok, {%Spec{children: children, links: links}}, %{}}
it will probably also allow to do slightly faster matching as you don't have to go into two tuples to check if result was ok
This pattern is spread accross the whole framework and maybe that's last moment to fix this.
@mat-hek @bblaszkow06 @mtsznowak please comment on this
We need sort of callbacks in elements that are made in uniform fashion.
Use cases:
imagine you have an element that is a sink that is a HTTPS server (think: icecast replacement). It receives a SSL-enabled request and during handshake it has to pick up right SSL certificate based on the domain received via SNI. It has to be synchronously called back.
imagine you have an element that is a source to JACK and you want to have custom handler on JACK xrun (underrun)
I would like to have an API similar to def_options
that specifies all possible callbacks.
{{:dynamic, pad_name}, {availability, mode, caps}}
to
pad_name: {:on_request, mode, caps}
sink_pad_name: {availability, {:pull, demand_in: :buffers}, caps}
to
sink_pad_name: [availability: :cos, caps: adkald, mode: :pull, demand_in: :buffers]
When pipeline is stopped because of toilet
buffer overflow, the error is not handled properly and pipeline fails with Function clause error
in Bunch:
2018-12-05T15:43:58.631636Z [warn] [core] PullBuffer :sink (toilet): received 288000 buffers,
which is above fail_level, from input :output that works in push mode.
To have control over amount of buffers being produced, consider using push mode.
If this is a normal situation, increase toilet warn/fail level.
Buffers: %Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}
PullBuffer %Membrane.Core.PullBuffer{current_size: 288000, demand: 65536, demand_pid: #PID<0.305.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :sink, preferred_size: 65536, q: #Qex<[{:buffers, [%Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}], 288000}]>, toilet: %{fail: 262144, warn: 131072}}
2018-12-05T15:43:58.634813Z [warn] [core] Encountered an error.
Reason: {:pull_buffer, [toilet: :too_many_buffers]}
PullBuffer :sink (toilet): failing: too many buffers
Stacktrace:
(membrane_core) lib/membrane/core/pull_buffer.ex:139: Membrane.Core.PullBuffer.store/3
(membrane_core) lib/membrane/core/element/buffer_controller.ex:79: anonymous fn/2 in Membrane.Core.Element.BufferController.handle_buffer_pull/3
(elixir) lib/map.ex:791: Map.get_and_update/3
(elixir) lib/access.ex:370: Access.get_and_update/3
(elixir) lib/map.ex:791: Map.get_and_update/3
(elixir) lib/access.ex:370: Access.get_and_update/3
2018-12-05T15:43:58.636849Z [warn] [core] PullBuffer :sink (toilet): received 288000 buffers,
which is above fail_level, from input :output that works in push mode.
To have control over amount of buffers being produced, consider using push mode.
If this is a normal situation, increase toilet warn/fail level.
Buffers: %Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}
PullBuffer %Membrane.Core.PullBuffer{current_size: 288000, demand: 65536, demand_pid: #PID<0.305.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :sink, preferred_size: 65536, q: #Qex<[{:buffers, [%Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}], 288000}]>, toilet: %{fail: 262144, warn: 131072}}
2018-12-05T15:43:58.639087Z [warn] [core] Encountered an error.
Reason: {:pull_buffer, [toilet: :too_many_buffers]}
PullBuffer :sink (toilet): failing: too many buffers
Stacktrace:
(membrane_core) lib/membrane/core/pull_buffer.ex:139: Membrane.Core.PullBuffer.store/3
(elixir) lib/map.ex:791: Map.get_and_update/3
(elixir) lib/access.ex:370: Access.get_and_update/3
(elixir) lib/map.ex:791: Map.get_and_update/3
(elixir) lib/access.ex:370: Access.get_and_update/3
(membrane_core) lib/membrane/core/element/buffer_controller.ex:72: Membrane.Core.Element.BufferController.handle_buffer_pull/3
2018-12-05T15:43:58.647542Z [warn] [sink core] Encountered an error.
Reason: {:function_clause, [{Bunch, :stateful_try_with_status, [error: %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: {:pull_buffer, [toilet: :too_many_buffers]}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: true, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}], [file: 'lib/bunch.ex', line: 158]}, {Membrane.Core.Element.MessageDispatcher, :handle_message, 3, [file: 'lib/membrane/core/element/message_dispatcher.ex', line: 20]}, {Membrane.Element, :handle_info, 2, [file: 'lib/membrane/element.ex', line: 270]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 637]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 711]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}]}
Element :sink: Terminating: Attempt to terminate element when it is not stopped
state: %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: %Membrane.Core.PullBuffer{current_size: 0, demand: 65536, demand_pid: #PID<0.305.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :sink, preferred_size: 65536, q: #Qex<[]>, toilet: %{fail: 262144, warn: 131072}}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: false, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}
Stacktrace:
(membrane_core) lib/membrane/core/element/lifecycle_controller.ex:72: Membrane.Core.Element.LifecycleController.handle_shutdown/2
(membrane_core) lib/membrane/core/element/message_dispatcher.ex:20: Membrane.Core.Element.MessageDispatcher.handle_message/3
(membrane_core) lib/membrane/element.ex:251: Membrane.Element.terminate/2
(stdlib) gen_server.erl:673: :gen_server.try_terminate/3
(stdlib) gen_server.erl:858: :gen_server.terminate/10
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
17:05:28.408 [error] GenServer #PID<0.306.0> terminating
** (FunctionClauseError) no function clause matching in Bunch.stateful_try_with_status/1
(bunch) lib/bunch.ex:158: Bunch.stateful_try_with_status({:error, %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: {:pull_buffer, [toilet: :too_many_buffers]}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: true, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}})
(membrane_core) lib/membrane/core/element/message_dispatcher.ex:20: Membrane.Core.Element.MessageDispatcher.handle_message/3
(membrane_core) lib/membrane/element.ex:270: Membrane.Element.handle_info/2
(stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:711: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {Membrane.Core.Message, :buffer, [[%Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}], :input]}
State: %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: %Membrane.Core.PullBuffer{current_size: 0, demand: 65536, demand_pid: #PID<0.305.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :sink, preferred_size: 65536, q: #Qex<[]>, toilet: %{fail: 262144, warn: 131072}}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: false, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}
2018-12-05T15:43:58.664980Z [warn] [mixer core] Encountered an error.
Reason: {:invalid_action, [action: {:demand, {{:dynamic, :input, 0}, :self, 177960}}, callback: :handle_event, module: Membrane.Element.LiveAudioMixer.Source]}
Element :mixer: Elements' Membrane.Element.LiveAudioMixer.Source :handle_event callback returned
invalid action: {:demand, {{:dynamic, :input, 0}, :self, 177960}}. For possible actions are check types
in Membrane.Element.Action module. Keep in mind that some actions are
available in different formats or unavailable for some callbacks,
element types, playback states or under some other conditions.
state: %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{caps: %Membrane.Caps.Audio.Raw{channels: 2, format: :s16le, sample_rate: 48000}, delay: 500000000, interval: 1000000000, playing: true, sinks: %{{:dynamic, :input, 0} => %{eos: false, queue: "", skip: 0}}, start_playing_time: -576457814235974775, tick: 1, timer_ref: #Reference<0.2486223907.3552837633.128386>}, module: Membrane.Element.LiveAudioMixer.Source, name: :mixer, pads: %{data: %{:output => %Membrane.Element.Pad.Data{accepted_caps: Membrane.Caps.Audio.Raw, availability: :always, buffer: nil, caps: %Membrane.Caps.Audio.Raw{channels: 2, format: :s16le, sample_rate: 48000}, current_id: nil, demand: nil, demand_unit: nil, direction: :output, end_of_stream?: false, mode: :push, other_demand_unit: :bytes, other_ref: :input, pid: #PID<0.306.0>, start_of_stream?: false, sticky_messages: nil}, {:dynamic, :input, 0} => %Membrane.Element.Pad.Data{accepted_caps: Membrane.Caps.Audio.Raw, availability: :on_request, buffer: %Membrane.Core.PullBuffer{current_size: 0, demand: 0, demand_pid: #PID<0.304.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :mixer, preferred_size: 65536, q: #Qex<[]>, toilet: false}, caps: %Membrane.Caps.Audio.Raw{channels: 2, format: :s16le, sample_rate: 48000}, current_id: 1, demand: 0, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.304.0>, start_of_stream?: true, sticky_messages: []}}, dynamic_currently_linking: [], info: %{input: %{accepted_caps: Membrane.Caps.Audio.Raw, availability: :on_request, current_id: 1, demand_unit: :bytes, direction: :input, mode: :pull}}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :filter, watcher: #PID<0.300.0>}
Stacktrace:
(membrane_core) lib/membrane/core/element/action_handler.ex:22: Membrane.Core.Element.ActionHandler.handle_action/4
(bunch) lib/bunch/enum.ex:121: anonymous fn/3 in Bunch.Enum.try_reduce/3
(elixir) lib/enum.ex:3281: Enumerable.List.reduce/3
(elixir) lib/enum.ex:1968: Enum.reduce_while/3
(membrane_core) lib/membrane/core/callback_handler.ex:127: Membrane.Core.CallbackHandler.exec_handle_actions/5
(membrane_core) lib/membrane/core/element/event_controller.ex:40: Membrane.Core.Element.EventController.exec_handle_event/4
2018-12-05T15:43:58.665156Z [warn] [core] Error while handling actions returned by callback :handle_event
** (EXIT from #PID<0.271.0>) shell process exited with reason: an exception was raised:
** (FunctionClauseError) no function clause matching in Bunch.stateful_try_with_status/1
(bunch) lib/bunch.ex:158: Bunch.stateful_try_with_status({:error, %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: {:pull_buffer, [toilet: :too_many_buffers]}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: true, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}})
(membrane_core) lib/membrane/core/element/message_dispatcher.ex:20: Membrane.Core.Element.MessageDispatcher.handle_message/3
(membrane_core) lib/membrane/element.ex:270: Membrane.Element.handle_info/2
(stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:711: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
here's the pipeline causing error:
defmodule Membrane.Demo.MP3.Pipeline do
use Membrane.Pipeline
def handle_init(path_to_mp3) do
caps = %Membrane.Caps.Audio.Raw{
format: :s16le,
sample_rate: 48_000,
channels: 2
}
children = [
file_src: %Membrane.Element.File.Source{location: path_to_mp3},
decoder: Membrane.Element.Mad.Decoder,
converter: %Membrane.Element.FFmpeg.SWResample.Converter{
output_caps: caps
},
mixer: %Membrane.Element.LiveAudioMixer.Source{caps: caps},
sink: Membrane.Element.PortAudio.Sink
]
links = %{
{:file_src, :output} => {:decoder, :input},
{:decoder, :output} => {:converter, :input},
{:converter, :output} => {:mixer, :input},
{:mixer, :output} => {:sink, :input, pull_buffer: [toilet: true]}
}
spec = %Membrane.Pipeline.Spec{
children: children,
links: links
}
{{:ok, spec}, %{}}
end
end
Currently elements can add some metadata.
We should add mechanism of traits that allow to store "history of a buffer", so e.g. if MP3 frame comes from HTTP source it would be useful to access HTTP headers even if it was decoded.
It should accumulate somehow over passing the pipeline.
There is no way to generate documentation for NIF defined via Unifex specs
This involves:
Currently we use only one payload type - erlang binary. We should support also a native C payload type, mutable and passed by pointer. This is necessary for processing large amount of data (such as high quality video).
We should consider:
We should add a mechanism enabling us to easier synchronize multiple streams. Probably we end up timestamping buffers.
Currently errors returned from callbacks always cause failure of pipeline, and cannot be handled in any way. We should provide a mechanism that could make it possible recover such errors at pipeline level. The potential problem with this solution is that it will probably require calls from element to pipeline, while such calls in opposite direction happen to be made, what may lead to deadlocks.
Membrane.Element docs for v0.2.0 has the following description
Alias for change_playback_state(pid, :stopped). See c:Elixir.Membrane.Element.change_playback_state/2
in docs for play/1
, prepare/1
, stop/1
, but change_playback_state/2
does not exists.
For two reasons:
playing
stateCurrently we have manager module per element type, while functionality of each element is very similar. We could try to define managers as handlers of particular messages/actions, such as caps, events, demands, buffers etc. This is related to refactor of Membrane.Element.Manager.Common
(#48)
We need a new type of element, something like GStreamer's bin but not the same. They should be located somewhere in between pipelines and bins.
Potential use case: a decoder that automatically creates elements decoding any known format.
handle_prepare(:stopped, state)
to
handle_prepare_to_play(state)
similarly handle_prepate_to_stop
handle_{write,process}
-> handle_*_list
, e.g. handle_write
-> handle_write_list
handle_{write,process}1
-> handle_*
, e.g. handle_write1
-> handle_write
def_known_sink_pads
-> def_sink_pads
, def_known_source_pads
-> def_source_pads
We need to have a developer documentation that explain:
It is held in state in some elements, what unnecesarily complicates code
We should have equivalent of GenServer's terminate (e.g. handle_terminate) both in pipelines and elements to allow proper cleanup.
There are two main reasons for this:
init_size
option conflicts with handling sticky events, and resolving this conflict is difficult and would obfuscate a lot of code,init_size
is not really a responsibility of PullBuffer, and holding it there disables one to use this functionality when a pad is in push mode, which potentially could be a caseInstead, a separate buffer for this purpose may be created, or such option may be added to the PlaybackBuffer
Currently there's no mechanism in membrane core for audio/video synchronisation.
We need to have an uniform abstraction layer or/and API for file metadata.
It would be nice to have an API in def_options
specify which options can be updated during runtime (essentially it will allow to define "setters" and "getters")
Use case:
When linking push output and pull input pads there is no check for toilet mode which should be enabled.
When linking pull output and push input nothing will happen and no errors are thrown
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.