直播推流管理系统prtmp的开发过程总结

行云流水
2023-03-16 / 0 评论 / 343 阅读 / 正在检测是否收录...

前言

去年通过docker部署rtmp服务并进行网络压力测试,今年我们的业务终于用到了直播流服务。自己写了一个小系统用来从上游拉取直播流并分发到阿里云或者腾讯的直播服务。然后供小程序调用。特此记录整个配置过程。

直播服务

阿里云

自建直播服务


参考: centos7部署rtmp服务并进行压力测试

直播流管理系统

主要接口

推流url生成

def ali_push_url(appName, streamName):
    """
    阿里云直播服务推流url
    """
    #推流
    push_domain = 'push.mytest.com'
    push_key = 'ZtBxxxxxxxMEKW'

    #过期时间
    expire_time = 86400
    time_stamp = int(time.time()) + expire_time

    #推流url
    pstr = '/{}/{}-{}-0-0-{}'.format(appName, streamName, time_stamp, push_key)
    pmd5 = md5_sign(pstr)

    purl= 'rtmp://{}/{}/{}?auth_key={}-0-0-{}'.format(push_domain, appName, streamName, time_stamp, pmd5)

    return  purl

直播服务url

def ali_live_url(appName, streamName):
    """
    阿里云直播服务播放url
    """
    resp = {}

    #播放
    play_domain = 'live.mytest.com'
    play_key = 'ulwxxxxxxxxxOm'

    #过期时间
    expire_time = 86400
    time_stamp = int(time.time()) + expire_time

    #播流url
    rstr = '/{}/{}-{}-0-0-{}'.format(appName, streamName, time_stamp, play_key)
    fstr = '/{}/{}.flv-{}-0-0-{}'.format(appName, streamName, time_stamp, play_key)
    hstr = '/{}/{}.m3u8-{}-0-0-{}'.format(appName, streamName, time_stamp, play_key)
    rmd5 = md5_sign(rstr)
    fmd5 = md5_sign(fstr)
    hmd5 = md5_sign(hstr)
    resp['rtmp_url'] = 'rtmp://{}/{}/{}?auth_key={}-0-0-{}'.format(play_domain, appName, streamName, time_stamp, rmd5)
    resp['flv_url'] = 'http://{}/{}/{}.flv?auth_key={}-0-0-{}'.format(play_domain, appName, streamName, time_stamp, fmd5)
    resp['hls_url'] = 'http://{}/{}/{}.m3u8?auth_key={}-0-0-{}'.format(play_domain, appName, streamName, time_stamp, hmd5)

    return  resp

推流任务函数

@celery.task
def push_rtmp_task(sid, source, target):
    """
    推流任务函数
    """
    print(source)
    print(target)
    # 视频源输入参数
    input_args = ['-i', source]

    # 推流输出参数
    output_args = ['-vcodec', 'libx264', '-acodec', 'aac', '-f', 'flv', target]

    # 合并参数
    command = ['ffmpeg'] + input_args + output_args

    # 执行命令
    with subprocess.Popen(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) as process:
        process.wait()

    # 更新推流信息
    living_info = load_json('./data/living.json')
    if sid in living_info.keys():
        living_info.pop(sid)
        save_living_info(living_info)

前端功能

手动推流

 //手动推流
    function ManPushRtmp(dom) {
        var mymessage = confirm("确认手动推送此直播流到" + $(dom).attr("ptype") + "?" + $(dom).attr("url"));
        if (mymessage == true) {
        $('#myModal').modal('show');
            $.ajax({
                url : '/api/rtmppush?url=' + $(dom).attr("url") + '&utype=' + $(dom).attr("utype") + '&sid=' + $(dom).attr("sid") + '&mid=' + $(dom).attr("mid") + '&lid=' + $(dom).attr("lid") + '&ptype=' + $(dom).attr("ptype"),
                type : 'get',
                success : function(data) {
                       $('#myModal').modal('hide');
                    if (data.code == 1000){
                        $("#child_table").bootstrapTable('refresh', data.data);
                        alert(data.msg);
                    }
                    else {
                        alert("推流失败! " + data.msg)
                    }
                },
                error : function(data){
                       $('#myModal').modal('hide');
                    alert("接口异常! " + data.msg)
                }
            });
        }
    };

停止推流

    //停止推流
    function ManStopPush(dom) {
        var mymessage = confirm("确认手动停止此直播流?" + $(dom).attr("stream_id"));
        if (mymessage == true) {
            $('#myModal').modal('show');
            $.ajax({
                url : '/api/stoppush?stream_id=' + $(dom).attr("stream_id"),
                type : 'get',
                success : function(data) {
                       $('#myModal').modal('hide');
                    if (data.code == 1000){
                        alert(data.msg);
                        location.reload();
                    }
                    else {
                        alert("失败! " + data.msg)
                    }
                },
                error : function(data){
                       $('#myModal').modal('hide');
                    alert("接口异常!" + data.msg)
                }
            });
        }
    };

异步任务

celery实例化

from celery import Celery

# celery配置
app.config['CELERY_BROKER_URL'] = 'redis://redis:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://redis:6379/0'

#celery实例化
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

caddy代理

prmtp.mytest.com {
  tls admin@mytest.com
  encode gzip
  log {
      output  file  /opt/logs/access.log
  }

  header / {
      Strict-Transport-Security "max-age=31536000;includeSubdomains;preload"
  }

  #访问认证 
  #密码:123456
  basicauth /* {
      admin  $2a$14$DIjtbTxbUSZHfHJUrjuU9.45SlrcwICIXNVSwVxehsnHhTXBBN  Nsi
  }

  ## HTTP 代理配置, 后端服务端口
  reverse_proxy  http://backend:5000
}

启动

#手动启动
python main.py
celery  -A main.celery worker  -l info

# 项目启动
docker-compose up -d

FAQ

js传递url参数需要转码

评论 (0)

取消
只有登录/注册用户才可评论