Friday, January 13, 2012

gstreamer appsrc in action

Lately I have been exploring gstreamer to play AV from a transport stream demultiplexer that I am developing (mostly for fun, slightly for work). After some research (read googling for play video using gstreamer), I concluded that gstreamer-appsrc is the way to go. Since I had a hard time finding a working example in the Internet on using this plugin, I thought I would share my test application here. If you are new to gstreamer, keep in mind that I am only couple of days ahead of you and I might have overlooked or missed even basic things in this post; and if you in fact know gstreamer, feel free to mention any such omissions you notice here so that I can correct them.

Here is a quick intro to gstreamer for new comers. It is a media framework based on plugins - you can read all about gstreamer here. That page says prior knowledge of glib and gobject is assumed; while proper knowledge of these two will help you progress faster, I didn't know either and I survived. The point is, you can (and you have to) learn those parallelly. In a nutshell, you work on a gstreamer pipeline which is basically a series of plugins where each one takes its input from the previous plugin, does something with it and gives the output to the next one. The data is passed around in buffers through the source and sink pads of the plugins. Usually at the left end of a pipeline there will be a pure source (like filesrc that reads data from a file and feeds the pipeline) and the last one would be a pure sink that plays the media. The typical steps in creating a gstreamer based player can be summarized as:
  1. Create your elements and set their properties, signal handlers and callbacks
  2. Add them to a bin (pipeline is a bin)
  3. Get a reference to the bin's bus and add bus callback for messages - at the minimum, an application should handle error messages and end of stream messages so as to stop the main loop on error and eos.
  4. Link their source and sink pads in the correct order
  5. Set the state of bin to playing and start the glib main loop.
You can find a gstreamer helloworld application and its explanation here. It uses a playbin, a readymade pipeline that does most of the work for us. It is good enough as long as you are not producing any media. But if you are producing some media data in your application and you need to play it using gstreamer, you need a way to feed this data to the pipeline. That is where appsrc comes into picture. The appsrc element allows applications to inject buffers into a pipeline.

There is this gstreamer appsrc example code in github, but unfortunately it didn't work for me - and based on the comment in that page, I am not the only one. It uses udpsink to stream data over a network. Since I just wanted to play it on my desktop, I replaced it with an xvimagesink which is used by most of the desktop based gstreamer video examples. But that didn't make any difference either - whenever I run the code, it exits with the following error:

ERROR from element mysource: Internal data flow error.
Debugging info: gstbasesrc.c(2582): gst_base_src_loop (): /GstPipeline:pipeline0/GstAppSrc:mysource:
streaming task paused, reason not-negotiated (-4)

As usual I resorted to StackOverflow for an answer, but apparently there aren't many gstreamer enthusiasts roaming in there. After earning my first tumbleweed badge for this question, I decided to do it myself. I started with this simple gstreamer AVI player. It uses the following pipeline:

filesrc -> avidemux -> decodebin -> customfilter -> ffmpegcolorspace -> videoscale -> autovideosink  

This pipeline is sort of self explanatory. The filesrc reads the avi file and passes the data to the demux where it gets split into audio and video. To keep things short, let us ignore audio. The video data is decoded in decodebin and the raw data is processed further and displayed using the last three plugins (gotta admit that I don't know nittygritties of those three; there are detailed websites out there explaining them - help yourself). If you look at the code in that page, you can see that not all the elements are linked together at the beginning. This is because in some plugins the source (output) pads are created dynamically based on the input data. So you have to listen for appropriate signals like pad-added on such elements and do the linking from the callbacks.

Since we are dealing with mpeg video, we don't need avidemux. And we can replace videoscale and autovideosink with an xvimagesink for the sake of brevity. So it comes down to replacing filesrc with an appsrc and feeding the data to the decoder. Let us create the elements:

/* Start by creating a new pipeline and the elements */
app->pipeline = (GstPipeline*)gst_pipeline_new("mypipeline");

/* Add bus callback */
bus = gst_pipeline_get_bus(app->pipeline);
gst_bus_add_watch(bus, (GstBusFunc)bus_callback, app);
gst_object_unref(bus);

/* Add signal handlers on appsrc */
g_signal_connect(app->src, "need-data", G_CALLBACK(start_feed), app);
g_signal_connect(app->src, "enough-data", G_CALLBACK(stop_feed), app);


/* Create the elements */
app->src = (GstAppSrc*)gst_element_factory_make("appsrc", "mysrc");
app->decoder = gst_element_factory_make("decodebin", "mydecoder");
app->ffmpeg = gst_element_factory_make("ffmpegcolorspace", "myffmpeg");
app->xvimagesink = gst_element_factory_make("xvimagesink", "myvsink");

/* Add elements to the pipeline */
gst_bin_add_many(GST_BIN(app->pipeline), (GstElement*)app->src, app->decoder, app->ffmpeg, app->xvimagesink, NULL);

/* Link them together - this should be done after adding to the bin */

if(!gst_element_link((GstElement*)app->src, app->decoder)){
    g_warning("failed to link src anbd decoder");
}

if(!gst_element_link(app->ffmpeg, app->xvimagesink)){
    g_warning("failed to link ffmpeg and xvsink");
}

/* Note that we haven't linked decoder to color-space element 
- that's done from pad-added signal callback. */
g_signal_connect(app->decoder, "pad-added", G_CALLBACK(on_pad_added), app->decoder);


Applications can feed the data to appsrc plugin using two methods; by calling gst_app_src_push_buffer function or by emitting push-buffer signals. I used the first approach since I am not fluent with signals and it gave intermittent warning messages while running. We start by writing a read_data function that reads chunks of data from a global file pointer and creates a GstBuffer and pushes them to the appsrc.

static gboolean read_data(gst_app_t *app)
{
    GstBuffer *buffer;
    guint8 *ptr;
    gint size;
    GstFlowReturn ret;

    ptr = g_malloc(BUFF_SIZE);
    g_assert(ptr);

    size = fread(ptr, 1, BUFF_SIZE, app->file);
    
    if(size == 0){
        ret = gst_app_src_end_of_stream(app->src);
        g_debug("eos returned %d at %d\n", ret, __LINE__);
        return FALSE;
    }

    buffer = gst_buffer_new();
    GST_BUFFER_MALLOCDATA(buffer) = ptr;
    GST_BUFFER_SIZE(buffer) = size;
    GST_BUFFER_DATA(buffer) = GST_BUFFER_MALLOCDATA(buffer);

    ret = gst_app_src_push_buffer(app->src, buffer);

    if(ret !=  GST_FLOW_OK){
        g_debug("push buffer returned %d for %d bytes \n", ret, size);
        return FALSE;
    }

    if(size != BUFF_SIZE){
        ret = gst_app_src_end_of_stream(app->src);
        g_debug("eos returned %d at %d\n", ret, __LINE__);
        return FALSE;
    }

    return TRUE;
}

The appsrc element emits mainly two signals namely need-data and enough-data to tell the application to start and stop feeding data. We need to listen to these callbacks. In the need-data callback, we add our read_data function as an idle handler to the main loop. The glib main loop will call this function from its main loop. When appsrc emits enough-data signal, we just remove this idle handler so that it is not called anymore. The boolean return value of read_data function is used by the main loop to decide whether to call it in the future - so we should return FALSE in case of any errors in pushing the data or we reach end of file.

static void start_feed (GstElement * pipeline, guint size, gst_app_t *app)
{
    if (app->sourceid == 0) {
        /* add idle handle to the main loop */
        app->sourceid = g_idle_add ((GSourceFunc) read_data, app);
    }
}

static void stop_feed (GstElement * pipeline, gst_app_t *app)
{
    if (app->sourceid != 0) {
        GST_DEBUG ("stop feeding");
        g_source_remove (app->sourceid);
        app->sourceid = 0;
    }
}

Now, start the pipeline, create a main loop and run it
gst_element_set_state((GstElement*)app->pipeline, GST_STATE_PLAYING);
app->loop = g_main_loop_new(NULL, FALSE);
g_main_loop_run(app->loop);

/* the previous call will return when we call quit from bus_callback - 
set the pipeline to null to do the cleanup */
gst_element_set_state((GstElement*)app->pipeline, GST_STATE_NULL);


And that's it, we got appsrc working. The source should be compiled using appropriate cflags and libs - you can use package config for getting them.

gcc -o testapp gst-testapp.c `pkg-config --cflags --libs  gstreamer-0.10 gstreamer-app-0.10`

If you get compiler or linker errors related to gstreamer, you probably don't have the necessary gstreamer development libraries installed. You can apt-get them using:

sudo apt-get install libgstreamer0.10-dev
sudo apt-get install libgstreamer-plugins-base0.10-dev

Here is the full source code.

/* 
  I don't know if it is syntax highlighter or blogger, but I can't seem to 
  put angle brackets around header file names properly.
*/
#include stdio.h
#include gst/gst.h
#include gst/app/gstappsrc.h

typedef struct {
 GstPipeline *pipeline;
 GstAppSrc *src;
 GstElement *sink;
 GstElement *decoder;
 GstElement *ffmpeg;
 GstElement *xvimagesink;
 GMainLoop *loop;
 guint sourceid;
 FILE *file;
}gst_app_t;

static gst_app_t gst_app;

#define BUFF_SIZE (1024)

static gboolean read_data(gst_app_t *app)
{
 GstBuffer *buffer;
 guint8 *ptr;
 gint size;
 GstFlowReturn ret;

 ptr = g_malloc(BUFF_SIZE);
 g_assert(ptr);

 size = fread(ptr, 1, BUFF_SIZE, app->file);
 
 if(size == 0){
  ret = gst_app_src_end_of_stream(app->src);
  g_debug("eos returned %d at %d\n", ret, __LINE__);
  return FALSE;
 }

 buffer = gst_buffer_new();
 GST_BUFFER_MALLOCDATA(buffer) = ptr;
 GST_BUFFER_SIZE(buffer) = size;
 GST_BUFFER_DATA(buffer) = GST_BUFFER_MALLOCDATA(buffer);

 ret = gst_app_src_push_buffer(app->src, buffer);

 if(ret !=  GST_FLOW_OK){
  g_debug("push buffer returned %d for %d bytes \n", ret, size);
  return FALSE;
 }

 if(size != BUFF_SIZE){
  ret = gst_app_src_end_of_stream(app->src);
  g_debug("eos returned %d at %d\n", ret, __LINE__);
  return FALSE;
 }

 return TRUE;
}

static void start_feed (GstElement * pipeline, guint size, gst_app_t *app)
{
 if (app->sourceid == 0) {
  GST_DEBUG ("start feeding");
  app->sourceid = g_idle_add ((GSourceFunc) read_data, app);
 }
}

static void stop_feed (GstElement * pipeline, gst_app_t *app)
{
 if (app->sourceid != 0) {
  GST_DEBUG ("stop feeding");
  g_source_remove (app->sourceid);
  app->sourceid = 0;
 }
}

static void on_pad_added(GstElement *element, GstPad *pad)
{
 GstCaps *caps;
 GstStructure *str;
 gchar *name;
 GstPad *ffmpegsink;
 GstPadLinkReturn ret;

 g_debug("pad added");

 caps = gst_pad_get_caps(pad);
 str = gst_caps_get_structure(caps, 0);

 g_assert(str);

 name = (gchar*)gst_structure_get_name(str);

 g_debug("pad name %s", name);

 if(g_strrstr(name, "video")){

  ffmpegsink = gst_element_get_pad(gst_app.ffmpeg, "sink");
  g_assert(ffmpegsink);
  ret = gst_pad_link(pad, ffmpegsink);
  g_debug("pad_link returned %d\n", ret);
  gst_object_unref(ffmpegsink);
 }
 gst_caps_unref(caps);
}

static gboolean bus_callback(GstBus *bus, GstMessage *message, gpointer *ptr)
{
 gst_app_t *app = (gst_app_t*)ptr;

 switch(GST_MESSAGE_TYPE(message)){

 case GST_MESSAGE_ERROR:{
  gchar *debug;
  GError *err;

  gst_message_parse_error(message, &err, &debug);
  g_print("Error %s\n", err->message);
  g_error_free(err);
  g_free(debug);
  g_main_loop_quit(app->loop);
 }
 break;

 case GST_MESSAGE_WARNING:{
  gchar *debug;
  GError *err;
  gchar *name;

  gst_message_parse_warning(message, &err, &debug);
  g_print("Warning %s\nDebug %s\n", err->message, debug);

  name = GST_MESSAGE_SRC_NAME(message);

  g_print("Name of src %s\n", name ? name : "nil");
  g_error_free(err);
  g_free(debug);
 }
 break;

 case GST_MESSAGE_EOS:
  g_print("End of stream\n");
  g_main_loop_quit(app->loop);
  break;

 case GST_MESSAGE_STATE_CHANGED:
  break;

 default:
  g_print("got message %s\n", \
   gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
  break;
 }

 return TRUE;
}

int main(int argc, char *argv[])
{
 gst_app_t *app = &gst_app;
 GstBus *bus;
 GstStateChangeReturn state_ret;

 if(argc != 2){
  printf("File name not specified\n");
  return 1;
 }

 app->file = fopen(argv[1], "r");

 g_assert(app->file);

 gst_init(NULL, NULL);

 app->pipeline = (GstPipeline*)gst_pipeline_new("mypipeline");
 bus = gst_pipeline_get_bus(app->pipeline);
 gst_bus_add_watch(bus, (GstBusFunc)bus_callback, app);
 gst_object_unref(bus);

 app->src = (GstAppSrc*)gst_element_factory_make("appsrc", "mysrc");
 app->decoder = gst_element_factory_make("decodebin", "mydecoder");
 app->ffmpeg = gst_element_factory_make("ffmpegcolorspace", "myffmpeg");
 app->xvimagesink = gst_element_factory_make("xvimagesink", "myvsink");

 g_assert(app->src);
 g_assert(app->decoder);
 g_assert(app->ffmpeg);
 g_assert(app->xvimagesink);

 g_signal_connect(app->src, "need-data", G_CALLBACK(start_feed), app);
 g_signal_connect(app->src, "enough-data", G_CALLBACK(stop_feed), app);
 g_signal_connect(app->decoder, "pad-added", 
  G_CALLBACK(on_pad_added), app->decoder);

 gst_bin_add_many(GST_BIN(app->pipeline), (GstElement*)app->src, 
  app->decoder, app->ffmpeg, app->xvimagesink, NULL);

 if(!gst_element_link((GstElement*)app->src, app->decoder)){
  g_warning("failed to link src anbd decoder");
 }

 if(!gst_element_link(app->ffmpeg, app->xvimagesink)){
  g_warning("failed to link ffmpeg and xvsink");
 }

 state_ret = gst_element_set_state((GstElement*)app->pipeline, GST_STATE_PLAYING);
 g_warning("set state returned %d\n", state_ret);

 app->loop = g_main_loop_new(NULL, FALSE);

 g_main_loop_run(app->loop);

 state_ret = gst_element_set_state((GstElement*)app->pipeline, GST_STATE_NULL);
 g_warning("set state null returned %d\n", state_ret);

 return 0;
}

This just shows appsrc in action - the real work is integrating it with your application. Remember that g_main_loop_run will return only when you call g_main_loop_quit - so you might want to call it from its own thread.

If you find it helpful or wrong, please share your feedback.

20 comments:

  1. Hello,

    i need open this source with gstreamer:
    ( echo "--video boundary--"; curl http://user:psw@ip:port/MJPEG.CGI ).

    Actually i use piped shell command like these:
    ( echo "--video boundary--"; curl http://user:psw@ip:port/MJPEG.CGI )| gst-launch fdsrc
    ! multipartdemux boundary="video boundary--" ! decodebin2 name=d ! ffmux_flv name=mux ! filesink location=output.flv d. ! queue ! videoscale ! videorate ! video/x-raw-yuv,width=640,height=480,framerate=10/1 ! ffenc_flv ! mux.

    for decode stram and save fo a FLV file.

    I have not foud a solution for replicate this with C/C++ code, and i think that with appsrc i can do it.

    Any suggestion.

    The stream come from an IP camera where i need to add boudary header.

    Thanks,

    Oscar

    ReplyDelete
  2. thank you very much .this description was very helpful for me .

    ReplyDelete
  3. Hi,

    Thanks for the detailed explanation.

    I am currently writing a application using appsrc (input) and alsasink (output) for playing the audio samples (raw pcm samples :44.1kHz, 16bitspersample and stereo) which are present in global buffer.

    Can you please help me in creating the pipeline.

    Thanks for the help.

    ReplyDelete
  4. Could you please tell me what is thep purpose of "static void on_pad_added(GstElement *element, GstPad *pad)" callback function?
    Thanks!

    ReplyDelete
  5. i just built your test app on mac.


    app->decoder = gst_element_factory_make("decodebin", "mydecoder");


    produces a null pointer, but

    app->decoder = gst_element_factory_make("decodebin2", "mydecoder");

    works fine. Any idea why i wouldnt have decodebin?

    ReplyDelete
    Replies
    1. decodebin has been replaced with decodebin2; your system probably has only decodebin2 installed.

      Delete
  6. Hi Amarghosh ,
    First of all thanks a lot for writing this post...its really helpfull for me as a naive user :)
    I just started learning Gstreamer.
    My confusion is that you start by writing a read_data function that reads chunks of data from a global file pointer and creates a GstBuffer and pushes them to the appsrc.
    Also at line 173 you are opening a file at line 173 , the filename is asked at the end of the program execution but what was intended is to read a buffer instead of a file.
    I have a char buffer which contains H.264 so how I am going to use the above code.
    Your help will be highly appreciated :)

    ReplyDelete
    Replies
    1. The file is used here only to demonstrate how to feed your own buffer to appsrc. You can assign a pointer to your data buffer as the `buffer` value of the GstBuffer passed to the gst_app_src_push_buffer function.

      buffer = gst_buffer_new();
      GST_BUFFER_DATA(buffer) = pointer_to_mp4_buffer;
      GST_BUFFER_SIZE(buffer) = size;
      /*
      next line is required only if you want gstreamer
      to free the buffer once it is done with it
      */
      GST_BUFFER_MALLOCDATA(buffer) = pointer_to_mp4_buffer;

      Hope this helps.

      Delete
  7. Hello Amarghosh,
    Thanks for the reply.
    So, just to confirm I have a buffer int stream_buffer[1500] which is holding the h.264 data and I am providing the buffer as follows->
    buffer = gst_buffer_new();
    GST_BUFFER_DATA(buffer) = stream_buffer;
    GST_BUFFER_SIZE(buffer) = size;
    ret = gst_app_src_push_buffer(app->src, buffer);
    GST_BUFFER_MALLOCDATA(buffer) = stream_buffer;
    Also , In my main function i have commented following lines
    if(argc != 2){
    printf("File name not specified\n");
    return 1;
    }

    app->file = fopen(argv[1], "r");

    g_assert(app->file);
    and starting with gst_init(NULL, NULL);
    I am using Gstreamer on windows and have replaced decode with decodebin2 and xvimagesink with autovideosink but still i am not able to get the video.
    Please suggest me with your knowledge.
    Thanks in Advance :)

    ReplyDelete
  8. 1. Is there any reason why the buffer an int array instead of a char array? Buffer is typically a byte stream which should be represented in a char array. Gstreamer expects a stream of bytes, not 4 byte words.
    2. If your buffer is an array, you don't need GST_BUFFER_MALLOCDATA line. This line tells gstreamer to free() the buffer once it is done with it.
    3. You can replace xvimagesink with autovideosink, but decodebin is required.

    If you write your buffer to a file, are you able to play that file with a normal player, say, vlc? If not, the content may not be in the correct format.

    ReplyDelete
  9. This comment has been removed by the author.

    ReplyDelete
  10. 1.Yes, the buffer is char buffer.I made it int buffer intentionally just to experiment few things but basically it is char.
    2.ok, I will not use GST_BUFFER_MALLOCDATA line.
    3.In your third last post you mentioned that decodebin is replaced by decodebin2, thats the reason I replaced it and also the program was failing at the line g_assert(app->decoder); but once the decodebin was replaced with decodebin2 at the following line then application was working fine.
    app->decoder = gst_element_factory_make("decodebin2", "mydecoder");

    Now , as you have suggested to write the buffer to a file I am trying to implement it and then I will try to play it with a media player.Can you suggest me any good link to follow for writing buffer to a media file.

    Thanks a lot for your help :)

    ReplyDelete
  11. Nice Work.
    I have replaced decodebin with decodebin2 and ximagevsink with autovideosink to make it work for my m/c.
    Though I still get the error in the starting - WARNING **: set state returned 2
    and at the end - WARNING **: set state null returned 1


    Any inputs ?

    ReplyDelete
    Replies
    1. @Amarghosh Thanks a lot for your explanation...
      I tried with your help and now i am able to decode my stream.

      Thanks again :)

      and yes I am also getting the warning as WARNING **: set state null returned 1

      Delete
    2. Since in GStreamer 1.0 macros like gst_buffer_data,gst_buffer_malloc_data are removed how can we access the buffer.

      Delete
    3. Use the gst_buffer_new_wrapped or gst_buffer_new_wrapped_full functions. The functions differ in the way they are freed at the end of usage. Read the docs for more info: http://gstreamer.freedesktop.org/data/doc/gstreamer/head/gstreamer/html/gstreamer-GstBuffer.html

      Delete
    4. @Cigara,
      Not sure if you are still tracking this, but as shown in this page http://gstreamer.freedesktop.org/data/doc/gstreamer/head/gstreamer/html/GstElement.html#GstStateChangeReturn 1 and 2 are not errors. Only zero is error!

      Delete
  12. I'm getting an assertion failed at this point -
    app->file = fopen(argv[1], "r");
    g_assert(app->file);

    What could the reason be?

    ReplyDelete
    Replies
    1. Check your file path. The app should be invoked with file name as the argument.
      And that assert should ideally be g_assert(app->file > 0)

      Delete