前言
安卓手机默认打开了通话录音功能,几年下来积攒了上千条录音,一直懒得清理。最近写了一个管理系统,将所有录音文件导入。进行可视化分析,给自己几年打的所有电话生成一份报告。更直观的展示自己的通讯情况。
开发过程
录音文件的管理通过django框架开发,主要功能点有通讯录管理、录音文件管理、录音文件转文字管理。转文字通过调用腾讯api完成,将结果保存到数据库,便于查询。可视化模块通过grafana直接读取mysql数据实现。
模型类的设计
模型类包括三种,Contact类、CallRecord类和RecordResult类。
Contact类
存储通讯录信息,包含名称和号码等
class Contact(models.Model):
number = models.CharField(max_length=15,verbose_name='电话号码') # 电话号码
name = models.CharField(blank=True,null=True,max_length=100,verbose_name='联系人') # 联系人姓名
# 新增的类型字段
TYPE_CHOICES = [
('family', '亲朋'),
('work', '工作'),
('promotion', '推广'),
('taxi', '滴滴'),
('service', '客服'),
('delivery', '快递'),
]
contact_type = models.CharField(
max_length=10,
choices=TYPE_CHOICES,
verbose_name='类型',
default='promotion', # 默认值为'亲朋'
)
def __str__(self):
return self.number
class Meta:
verbose_name = '通讯录'
verbose_name_plural = '通讯录'
CallRecord类
用于存储音频文件、状态、音频转文字的任务信息等
class CallRecord(models.Model):
phone_number = models.ForeignKey(Contact, related_name='call_records', on_delete=models.CASCADE) # 电话号码外键
call_time = models.DateTimeField(verbose_name='时间') # 通话时间
recording_file = models.CharField(max_length=255, blank=True, null=True, verbose_name='文件名') # 录音文件名
notes = models.TextField(blank=True, null=True, verbose_name='备注') # 备注
task_id = models.CharField(max_length=255, blank=True, null=True, verbose_name='任务id') # 任务ID,可为空
# 状态字段的选择
STATUS_CHOICES = [
('未处理', '未处理'), # Unprocessed
('处理中', '处理中'), # Processing
('已完成', '已完成'), # Completed
]
status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='未处理', verbose_name='状态') # 状态,默认值为 '未处理'
def __str__(self):
return f"{self.phone_number.number} - {self.call_time}" # 返回通话记录的字符串表示
class Meta:
verbose_name = '通话录音' # 该模型的单数名称
verbose_name_plural = '通话录音' # 该模型的复数名称
RecordResult
用于管理存储音频转文字的结果等信息
class RecordResult(models.Model):
call_record = models.OneToOneField(CallRecord, related_name='record_result', on_delete=models.CASCADE) # 与 CallRecord 的一对一关系
# 录音时长,单位为秒
duration = models.PositiveIntegerField(verbose_name='时长') # 时长,正整数
# 错误信息,可以为空
error_message = models.TextField(blank=True, null=True, verbose_name='错误') # 错误信息,可为空值
# 文本结果,可以为空
text_result = models.TextField(blank=True, null=True, verbose_name='全文') # 文本结果,可为空
# 文本大纲,可以为空
text_outline = models.TextField(blank=True, null=True, verbose_name='大纲') # 文本大纲,可为空
# 标签,可以为空
tags = models.CharField(max_length=255, blank=True, null=True, verbose_name='标签') # 标签,最多 255 字符,可以为空
# 备注,可以为空
notes = models.TextField(blank=True, null=True, verbose_name='备注') # 备注,可为空
def __str__(self):
return f"{self.call_record.phone_number.number} - {self.duration}"
class Meta:
verbose_name = '通话文本' # 模型的单数名称
verbose_name_plural = '通话文本' # 模型的复数名称
接口设计
录音文件入库、文本结果处理等任务过程中用到的各种接口。
# 防止重复入库的接口
class CheckRecordingFile(APIView):
def post(self, request):
# 获取file_name参数
file_name = request.data.get('file_name')
if not file_name:
return Response({"error": "file_name is required"}, status=status.HTTP_400_BAD_REQUEST)
# 查询CallRecord中是否有这个file_name
record_exists = CallRecord.objects.filter(recording_file=file_name).exists()
# 根据查询结果返回True或False
if record_exists:
return Response({"exists": True}, status=status.HTTP_200_OK)
else:
return Response({"exists": False}, status=status.HTTP_200_OK)
#录音在线播放用到的接口
class AudioList(APIView):
def get(self, request):
mediaList = []
# 获取URL查询字符串中的rid参数
rid = request.query_params.get('rid')
cid = request.query_params.get('cid')
if rid:
mediaList = CallRecord.objects.filter(id=rid)
if cid:
contact = Contact.objects.get(id=cid)
mediaList = CallRecord.objects.filter(phone_number=contact)
arr = []
#倒序
for item in mediaList[::-1]:
# 随机1-10专辑封面图片
sui_num = random.randint(1, 10)
#构建
arr.append({
'id': item.id,
'title': f"{item.record_result.id} - {item.phone_number.number} - {item.call_time}",
'singer': f"{item.phone_number.name}",
'songUrl': f"{settings.MEDIA_URL}{urllib.parse.quote(item.recording_file)}",
'imageUrl': '/static/images/' + str(sui_num) + '.png',
})
return Response({'list': arr}, status=status.HTTP_201_CREATED)
录音文件同步
手机中的通讯录音会自动传输到家庭nfs,管理系统会单独启动一个循环任务去nfs拉取音频文件入库并创建音频转文字任务。
import subprocess
import time
from datetime import datetime
# 定义需要执行的命令
commands = [
"mkdir -p /tmp/lxnfs",
"mount -t smbfs //189xxxxx805:zhixxxx6@192.168.1.150/7460088 /tmp/lxnfs",
"cp -n /tmp/lxnfs/来自ADT-AN00的手机备份/文件夹备份/* /Users/xinei/project/audioman/data/files/ || true",
"umount /tmp/lxnfs"
]
# 定义一个函数来执行这些命令
def run_commands():
for command in commands:
try:
# 执行每条命令
subprocess.run(command, shell=True, check=True)
print(f"执行成功: {command}")
except subprocess.CalledProcessError as e:
print(f"命令执行失败: {command}\n错误信息: {e}")
# 主循环,每小时检查一次时间
while True:
current_time = datetime.now()
# 只在 20:00 到 24:00 之间执行命令
if current_time.hour >= 20 and current_time.hour < 23:
print(f"当前时间: {current_time}. 在允许的时间范围内,执行命令。")
run_commands()
else:
print(f"当前时间: {current_time}. 不在允许的时间范围内,跳过执行。")
# 等待 1 小时再检查时间
time.sleep(3600)
录音文件转文字
录音文件写入数据库后,默认状态为待处理。另一个脚本会自动扫描未处理的记录,然后自动创建处理任务。
# 监控指定目录
def monitor_directory(path):
observer = None
event_handler = MyEventHandler()
try:
while True:
current_time = datetime.now()
# 只在20:00到24:00之间执行监控
if current_time.hour >= 20 and current_time.hour < 24:
if observer is None: # 只有在 observer 没有启动时才创建新的观察者
observer = Observer()
observer.schedule(event_handler, path, recursive=False)
observer.start()
print(f"当前时间: {current_time}. 启动监控。")
else:
if observer is not None:
observer.stop()
observer.join() # 等待线程停止
observer = None # 将 observer 置为 None,以便后续创建新的实例
print(f"当前时间: {current_time}. 停止监控。")
time.sleep(3600) # 每小时检查一次时间
time.sleep(600) # 每次监控状态保持10分钟,然后再循环检查
except KeyboardInterrupt:
if observer is not None:
observer.stop()
observer.join()
if __name__ == "__main__":
directory_to_watch = "/files/" # 替换为你要监控的目录
monitor_directory(directory_to_watch)
可视化过程
可视化通过grafana实现。直接链接mysql数据库,通过sql查询数据并返回,具体页面如开头所示。
完整项目代码获取
【统计分析】基于Django开发的录音管理系统源码
评论 (0)