From 876720f9e27fc4afdb855257e4118d7735a4bf4a Mon Sep 17 00:00:00 2001 From: philippe44 Date: Sun, 18 Aug 2019 17:38:48 -0700 Subject: [PATCH] AirPlay progress with synchro --- components/raop/raop.c | 20 +- components/raop/raop_sink.c | 6 +- components/raop/raop_sink.h | 2 +- components/raop/rtp.c | 236 +++++------------------ components/raop/rtp.h | 8 +- components/squeezelite/buffer.c | 5 + components/squeezelite/decode_external.c | 46 ++++- components/squeezelite/squeezelite.h | 1 + 8 files changed, 108 insertions(+), 216 deletions(-) diff --git a/components/raop/raop.c b/components/raop/raop.c index 5cfbfe09..9927a710 100644 --- a/components/raop/raop.c +++ b/components/raop/raop.c @@ -53,7 +53,6 @@ typedef struct raop_ctx_s { int sock; // socket of the above struct in_addr peer; // IP of the iDevice (airplay sender) bool running; - bool drift; #ifdef WIN32 pthread_t thread, search_thread; #else @@ -141,8 +140,7 @@ struct raop_ctx_s *raop_create(struct in_addr host, char *name, ctx->sock = socket(AF_INET, SOCK_STREAM, 0); ctx->cmd_cb = cmd_cb; ctx->data_cb = data_cb; - ctx->drift = false; - ctx->latency = min(latency, 44100); + ctx->latency = min(latency, 88200); if (ctx->sock == -1) { LOG_ERROR("Cannot create listening socket", NULL); free(ctx); @@ -464,9 +462,8 @@ static bool handle_rtsp(raop_ctx_t *ctx, int sock) if ((p = strcasestr(buf, "timing_port")) != NULL) sscanf(p, "%*[^=]=%hu", &tport); if ((p = strcasestr(buf, "control_port")) != NULL) sscanf(p, "%*[^=]=%hu", &cport); - rtp = rtp_init(ctx->peer, false, ctx->drift, true, ctx->latency, - ctx->rtsp.aeskey, ctx->rtsp.aesiv, ctx->rtsp.fmtp, - cport, tport, ctx->data_cb); + rtp = rtp_init(ctx->peer, ctx->latency, ctx->rtsp.aeskey, ctx->rtsp.aesiv, + ctx->rtsp.fmtp, cport, tport, ctx->cmd_cb, ctx->data_cb); ctx->rtp = rtp.ctx; @@ -493,10 +490,9 @@ static bool handle_rtsp(raop_ctx_t *ctx, int sock) kd_add(resp, "Audio-Latency", latency); } - if ((buf = kd_lookup(headers, "RTP-Info")) != NULL) { - if ((p = strcasestr(buf, "seq")) != NULL) sscanf(p, "%*[^=]=%hu", &seqno); - if ((p = strcasestr(buf, "rtptime")) != NULL) sscanf(p, "%*[^=]=%u", &rtptime); - } + buf = kd_lookup(headers, "RTP-Info"); + if (buf && (p = strcasestr(buf, "seq")) != NULL) sscanf(p, "%*[^=]=%hu", &seqno); + if (buf && (p = strcasestr(buf, "rtptime")) != NULL) sscanf(p, "%*[^=]=%u", &rtptime); if (ctx->rtp) rtp_record(ctx->rtp, seqno, rtptime); @@ -537,7 +533,7 @@ static bool handle_rtsp(raop_ctx_t *ctx, int sock) } if (!strcmp(method, "SET_PARAMETER")) { char *p; - if ((p = strcasestr(body, "volume")) != NULL) { + if (body && (p = strcasestr(body, "volume")) != NULL) { float volume; sscanf(p, "%*[^:]:%f", &volume); @@ -546,7 +542,7 @@ static bool handle_rtsp(raop_ctx_t *ctx, int sock) ctx->cmd_cb(RAOP_VOLUME, &volume); } /* - if (((p = kd_lookup(headers, "Content-Type")) != NULL) && !strcasecmp(p, "application/x-dmap-tagged")) { + if (body && ((p = kd_lookup(headers, "Content-Type")) != NULL) && !strcasecmp(p, "application/x-dmap-tagged")) { struct metadata_s metadata; dmap_settings settings = { NULL, NULL, NULL, NULL, NULL, NULL, NULL, on_dmap_string, NULL, diff --git a/components/raop/raop_sink.c b/components/raop/raop_sink.c index 3e79f6aa..befeeca3 100644 --- a/components/raop/raop_sink.c +++ b/components/raop/raop_sink.c @@ -42,7 +42,7 @@ void raop_sink_init(raop_cmd_cb_t cmd_cb, raop_data_cb_t data_cb) { tcpip_adapter_get_hostname(TCPIP_ADAPTER_IF_STA, &hostname); host.s_addr = ipInfo.ip.addr; - //initialize mDNS + // initialize mDNS ESP_ERROR_CHECK( mdns_init() ); ESP_ERROR_CHECK( mdns_hostname_set(hostname) ); @@ -54,10 +54,10 @@ void raop_sink_init(raop_cmd_cb_t cmd_cb, raop_data_cb_t data_cb) { ESP_LOGI(TAG, "mdns hostname set to: [%s] with servicename %s", hostname, sink_name); - //initialize service + // create RAOP instance, latency is set by controller uint8_t mac[6]; esp_read_mac(mac, ESP_MAC_WIFI_STA); - raop = raop_create(host, sink_name, mac, 44100, cmd_cb, data_cb); + raop = raop_create(host, sink_name, mac, 0, cmd_cb, data_cb); } /**************************************************************************************** diff --git a/components/raop/raop_sink.h b/components/raop/raop_sink.h index b60a8ed0..7ff1001a 100644 --- a/components/raop/raop_sink.h +++ b/components/raop/raop_sink.h @@ -11,7 +11,7 @@ #include -typedef enum { RAOP_STREAM, RAOP_PLAY, RAOP_FLUSH, RAOP_PAUSE, RAOP_STOP, RAOP_VOLUME } raop_event_t ; +typedef enum { RAOP_STREAM, RAOP_PLAY, RAOP_FLUSH, RAOP_PAUSE, RAOP_STOP, RAOP_VOLUME, RAOP_TIMING } raop_event_t ; typedef void (*raop_cmd_cb_t)(raop_event_t event, void *param); typedef void (*raop_data_cb_t)(const u8_t *data, size_t len); diff --git a/components/raop/rtp.c b/components/raop/rtp.c index 89d2ae9f..2ab3424d 100644 --- a/components/raop/rtp.c +++ b/components/raop/rtp.c @@ -41,6 +41,7 @@ #include "platform.h" #include "rtp.h" +#include "raop_sink.h" #include "log_util.h" #include "util.h" @@ -63,16 +64,13 @@ #define MS2TS(ms, rate) ((((u64_t) (ms)) * (rate)) / 1000) #define TS2MS(ts, rate) NTP2MS(TS2NTP(ts,rate)) - #define GAP_THRES 8 - #define GAP_COUNT 20 - -extern log_level raop_loglevel; -static log_level *loglevel = &raop_loglevel; + extern log_level raop_loglevel; + static log_level *loglevel = &raop_loglevel; //#define __RTP_STORE // default buffer size -#define BUFFER_FRAMES ( (120 * 44100) / (352 * 100) ) +#define BUFFER_FRAMES ( (120 * 88200) / (352 * 100) ) #define MAX_PACKET 1408 #define RTP_SYNC (0x01) @@ -103,7 +101,8 @@ typedef struct rtp_s { #else mbedtls_aes_context aes; #endif - bool decrypt, range; + bool decrypt; + u8_t *decrypt_buf; int frame_size, frame_duration; int in_frames, out_frames; struct in_addr host; @@ -113,15 +112,11 @@ typedef struct rtp_s { int sock; } rtp_sockets[3]; // data, control, timing struct timing_s { - bool drift; u64_t local, remote; - u32_t count, gap_count; - s64_t gap_sum, gap_adjust; } timing; struct { u32_t rtp, time; u8_t status; - bool first, required; } synchro; struct { u32_t time; @@ -129,9 +124,10 @@ typedef struct rtp_s { u32_t rtptime; } record; int latency; // rtp hold depth in samples - u32_t resent_frames; // total recovered frames + u32_t resent_req, resent_rec; // total resent + recovered frames u32_t silent_frames; // total silence frames u32_t filled_frames; // silence frames in current silence episode + u32_t discarded; int skip; // number of frames to skip to keep sync alignement abuf_t audio_buffer[BUFFER_FRAMES]; seq_t ab_read, ab_write; @@ -144,7 +140,9 @@ typedef struct rtp_s { alac_file *alac_codec; int flush_seqno; bool playing; - rtp_data_cb_t callback; + raop_data_cb_t data_cb; + raop_cmd_cb_t cmd_cb; +u16_t syncS, syncN; } rtp_t; @@ -192,10 +190,9 @@ static alac_file* alac_init(int fmtp[32]) { } /*---------------------------------------------------------------------------*/ -rtp_resp_t rtp_init(struct in_addr host, bool sync, bool drift, bool range, - int latency, char *aeskey, char *aesiv, char *fmtpstr, +rtp_resp_t rtp_init(struct in_addr host, int latency, char *aeskey, char *aesiv, char *fmtpstr, short unsigned pCtrlPort, short unsigned pTimingPort, - rtp_data_cb_t callback) + raop_cmd_cb_t cmd_cb, raop_data_cb_t data_cb) { int i = 0; char *arg; @@ -208,15 +205,13 @@ rtp_resp_t rtp_init(struct in_addr host, bool sync, bool drift, bool range, ctx->host = host; ctx->decrypt = false; - ctx->callback = callback; + ctx->cmd_cb = cmd_cb; + ctx->data_cb = data_cb; ctx->rtp_host.sin_family = AF_INET; ctx->rtp_host.sin_addr.s_addr = INADDR_ANY; pthread_mutex_init(&ctx->ab_mutex, 0); ctx->flush_seqno = -1; ctx->latency = latency; - ctx->synchro.required = sync; - ctx->timing.drift = drift; - ctx->range = range; // write pointer = last written, read pointer = next to read so fill = w-r+1 ctx->ab_read = ctx->ab_write + 1; @@ -238,6 +233,7 @@ rtp_resp_t rtp_init(struct in_addr host, bool sync, bool drift, bool range, mbedtls_aes_setkey_dec(&ctx->aes, (unsigned char*) aeskey, 128); #endif ctx->decrypt = true; + ctx->decrypt_buf = malloc(MAX_PACKET); } memset(fmtp, 0, sizeof(fmtp)); @@ -301,6 +297,7 @@ void rtp_end(rtp_t *ctx) delete_alac(ctx->alac_codec); + if (ctx->decrypt_buf) free(ctx->decrypt_buf); buffer_release(ctx->audio_buffer); free(ctx); @@ -324,7 +321,6 @@ bool rtp_flush(rtp_t *ctx, unsigned short seqno, unsigned int rtptime) buffer_reset(ctx->audio_buffer); ctx->playing = false; ctx->flush_seqno = seqno; - ctx->synchro.first = false; pthread_mutex_unlock(&ctx->ab_mutex); } @@ -376,7 +372,6 @@ static int seq_order(seq_t a, seq_t b) { /*---------------------------------------------------------------------------*/ static void alac_decode(rtp_t *ctx, s16_t *dest, char *buf, int len, int *outsize) { - unsigned char packet[MAX_PACKET]; unsigned char iv[16]; int aeslen; assert(len<=MAX_PACKET); @@ -385,12 +380,12 @@ static void alac_decode(rtp_t *ctx, s16_t *dest, char *buf, int len, int *outsiz aeslen = len & ~0xf; memcpy(iv, ctx->aesiv, sizeof(iv)); #ifdef WIN32 - AES_cbc_encrypt((unsigned char*)buf, packet, aeslen, &ctx->aes, iv, AES_DECRYPT); + AES_cbc_encrypt((unsigned char*)buf, ctx->decrypt_buf, aeslen, &ctx->aes, iv, AES_DECRYPT); #else - mbedtls_aes_crypt_cbc(&ctx->aes, MBEDTLS_AES_DECRYPT, aeslen, iv, (unsigned char*) buf, packet); + mbedtls_aes_crypt_cbc(&ctx->aes, MBEDTLS_AES_DECRYPT, aeslen, iv, (unsigned char*) buf, ctx->decrypt_buf); #endif - memcpy(packet+aeslen, buf+aeslen, len-aeslen); - decode_frame(ctx->alac_codec, packet, dest, outsize); + memcpy(ctx->decrypt_buf+aeslen, buf+aeslen, len-aeslen); + decode_frame(ctx->alac_codec, ctx->decrypt_buf, dest, outsize); } else decode_frame(ctx->alac_codec, (unsigned char*) buf, dest, outsize); } @@ -398,19 +393,21 @@ static void alac_decode(rtp_t *ctx, s16_t *dest, char *buf, int len, int *outsiz /*---------------------------------------------------------------------------*/ static void buffer_put_packet(rtp_t *ctx, seq_t seqno, unsigned rtptime, bool first, char *data, int len) { abuf_t *abuf = NULL; + u32_t playtime; pthread_mutex_lock(&ctx->ab_mutex); if (!ctx->playing) { if ((ctx->flush_seqno == -1 || seq_order(ctx->flush_seqno, seqno)) && - ((ctx->synchro.required && ctx->synchro.first) || !ctx->synchro.required)) { + (ctx->synchro.status & RTP_SYNC && ctx->synchro.status & NTP_SYNC)) { ctx->ab_write = seqno-1; ctx->ab_read = seqno; ctx->skip = 0; ctx->flush_seqno = -1; ctx->playing = true; - ctx->synchro.first = false; - ctx->resent_frames = ctx->silent_frames = 0; + ctx->resent_req = ctx->resent_rec = ctx->silent_frames = ctx->discarded = 0; + playtime = ctx->synchro.time + (((s32_t)(rtptime - ctx->synchro.rtp)) * 10) / 441; + ctx->cmd_cb(RAOP_PLAY, &playtime); } else { pthread_mutex_unlock(&ctx->ab_mutex); return; @@ -422,6 +419,7 @@ static void buffer_put_packet(rtp_t *ctx, seq_t seqno, unsigned rtptime, bool fi abuf = ctx->audio_buffer + BUFIDX(seqno); ctx->ab_write = seqno; LOG_SDEBUG("packet expected seqno:%hu rtptime:%u (W:%hu R:%hu)", seqno, rtptime, ctx->ab_write, ctx->ab_read); + } else if (seq_order(ctx->ab_write, seqno)) { // newer than expected if (seqno - ctx->ab_write - 1 > ctx->latency / ctx->frame_size) { @@ -448,6 +446,7 @@ static void buffer_put_packet(rtp_t *ctx, seq_t seqno, unsigned rtptime, bool fi } else if (seq_order(ctx->ab_read, seqno + 1)) { // recovered packet, not yet sent abuf = ctx->audio_buffer + BUFIDX(seqno); + ctx->resent_rec++; LOG_DEBUG("[%p]: packet recovered seqno:%hu rtptime:%u (W:%hu R:%hu)", ctx, seqno, rtptime, ctx->ab_write, ctx->ab_read); } else { // too late @@ -455,7 +454,7 @@ static void buffer_put_packet(rtp_t *ctx, seq_t seqno, unsigned rtptime, bool fi } if (ctx->in_frames++ > 1000) { - LOG_INFO("[%p]: fill [level:%hd] [W:%hu R:%hu]", ctx, (seq_t) (ctx->ab_write - ctx->ab_read + 1), ctx->ab_write, ctx->ab_read); + LOG_INFO("[%p]: fill [level:%hd rec:%u] [W:%hu R:%hu]", ctx, (seq_t) (ctx->ab_write - ctx->ab_read + 1), ctx->resent_rec, ctx->ab_write, ctx->ab_read); ctx->in_frames = 0; } @@ -479,7 +478,7 @@ static void buffer_put_packet(rtp_t *ctx, seq_t seqno, unsigned rtptime, bool fi // push as many frames as possible through callback static void buffer_push_packet(rtp_t *ctx) { abuf_t *curframe = NULL; - u32_t now, playtime; + u32_t now, playtime, hold = max((ctx->latency * 1000) / (8 * 44100), 100); int i; // not ready to play yet @@ -492,27 +491,17 @@ static void buffer_push_packet(rtp_t *ctx) { do { curframe = ctx->audio_buffer + BUFIDX(ctx->ab_read); - playtime = ctx->synchro.time + (((s32_t)(curframe->rtptime - ctx->synchro.rtp)) * 1000) / 44100; + playtime = ctx->synchro.time + (((s32_t)(curframe->rtptime - ctx->synchro.rtp)) * 10) / 441; - /* - if (now > playtime + ctx->frame_duration) { - //LOG_INFO("[%p]: discarded frame (W:%hu R:%hu)", ctx, ctx->ab_write, ctx->ab_read); + if (now > playtime) { + LOG_DEBUG("[%p]: discarded frame now:%u missed by %d (W:%hu R:%hu)", ctx, now, now - playtime, ctx->ab_write, ctx->ab_read); + ctx->discarded++; } else if (curframe->ready) { - ctx->callback((const u8_t*) curframe->data, curframe->len); + ctx->data_cb((const u8_t*) curframe->data, curframe->len); curframe->ready = 0; - } else if (now >= playtime) { + } else if (playtime - now <= hold) { LOG_DEBUG("[%p]: created zero frame (W:%hu R:%hu)", ctx, ctx->ab_write, ctx->ab_read); - ctx->callback(silence_frame, ctx->frame_size * 4); - ctx->silent_frames++; - } else break; - */ - - if (curframe->ready) { - ctx->callback((const u8_t*) curframe->data, curframe->len); - curframe->ready = 0; - } else if (now >= playtime) { - LOG_DEBUG("[%p]: created zero frame (W:%hu R:%hu)", ctx, ctx->ab_write, ctx->ab_read); - ctx->callback(silence_frame, ctx->frame_size * 4); + ctx->data_cb(silence_frame, ctx->frame_size * 4); ctx->silent_frames++; } else break; @@ -522,16 +511,16 @@ static void buffer_push_packet(rtp_t *ctx) { } while (ctx->ab_write - ctx->ab_read + 1 > 0); if (ctx->out_frames > 1000) { - LOG_INFO("[%p]: drain [level:%hd gap:%d] [W:%hu R:%hu] [R:%u S:%u F:%u]", + LOG_INFO("[%p]: drain [level:%hd gap:%d] [W:%hu R:%hu] [R:%u S:%u F:%u D:%u] (head in %u ms) ", ctx, ctx->ab_write - ctx->ab_read, playtime - now, ctx->ab_write, ctx->ab_read, - ctx->resent_frames, ctx->silent_frames, ctx->filled_frames); + ctx->resent_req, ctx->silent_frames, ctx->filled_frames, ctx->discarded, playtime - now); ctx->out_frames = 0; } LOG_SDEBUG("playtime %u %d [W:%hu R:%hu] %d", playtime, playtime - now, ctx->ab_write, ctx->ab_read, curframe->ready); // each missing packet will be requested up to (latency_frames / 16) times - for (i = 16; seq_order(ctx->ab_read + i, ctx->ab_write); i += 16) { + for (i = 1; seq_order(ctx->ab_read + i, ctx->ab_write); i += 16) { abuf_t *frame = ctx->audio_buffer + BUFIDX(ctx->ab_read + i); if (!frame->ready && now - frame->last_resend > RESEND_TO) { rtp_request_resend(ctx, ctx->ab_read + i, ctx->ab_read + i); @@ -626,8 +615,8 @@ static void *rtp_thread_func(void *arg) { pthread_mutex_lock(&ctx->ab_mutex); - // re-align timestamp and expected local playback time - if (!ctx->latency) ctx->latency = rtp_now - rtp_now_latency; + // re-align timestamp and expected local playback time (and magic 11025 latency) + if (!ctx->latency) ctx->latency = rtp_now - rtp_now_latency + 11025; ctx->synchro.rtp = rtp_now - ctx->latency; ctx->synchro.time = ctx->timing.local + (u32_t) NTP2MS(remote - ctx->timing.remote); @@ -636,7 +625,6 @@ static void *rtp_thread_func(void *arg) { // 1st sync packet received (signals a restart of playback) if (packet[0] & 0x10) { - ctx->synchro.first = true; LOG_INFO("[%p]: 1st sync packet received", ctx); } @@ -656,7 +644,6 @@ static void *rtp_thread_func(void *arg) { // NTP timing packet case 0x53: { u64_t expected; - s64_t delta = 0; u32_t reference = ntohl(*(u32_t*)(pktp+12)); // only low 32 bits in our case u64_t remote =(((u64_t) ntohl(*(u32_t*)(pktp+16))) << 32) + ntohl(*(u32_t*)(pktp+20)); u32_t roundtrip = gettime_ms() - reference; @@ -676,50 +663,17 @@ static void *rtp_thread_func(void *arg) { ctx->timing.remote = remote; ctx->timing.local = reference; - ctx->timing.count++; - if (!ctx->timing.drift && (ctx->synchro.status & NTP_SYNC)) { - delta = NTP2MS((s64_t) expected - (s64_t) ctx->timing.remote); - ctx->timing.gap_sum += delta; - - pthread_mutex_lock(&ctx->ab_mutex); - - /* - if expected time is more than remote, then our time is - running faster and we are transmitting frames too quickly, - so we'll run out of frames, need to add one - */ - if (ctx->timing.gap_sum > GAP_THRES && ctx->timing.gap_count++ > GAP_COUNT) { - LOG_INFO("[%p]: Sending packets too fast %Ld [W:%hu R:%hu]", ctx, ctx->timing.gap_sum, ctx->ab_write, ctx->ab_read); - ctx->ab_read--; - ctx->audio_buffer[BUFIDX(ctx->ab_read)].ready = 1; - ctx->timing.gap_sum -= GAP_THRES; - ctx->timing.gap_adjust -= GAP_THRES; - /* - if expected time is less than remote, then our time is - running slower and we are transmitting frames too slowly, - so we'll overflow frames buffer, need to remove one - */ - } else if (ctx->timing.gap_sum < -GAP_THRES && ctx->timing.gap_count++ > GAP_COUNT) { - if (seq_order(ctx->ab_read, ctx->ab_write + 1)) { - ctx->audio_buffer[BUFIDX(ctx->ab_read)].ready = 0; - ctx->ab_read++; - } else ctx->skip++; - ctx->timing.gap_sum += GAP_THRES; - ctx->timing.gap_adjust += GAP_THRES; - LOG_INFO("[%p]: Sending packets too slow %Ld (skip: %d) [W:%hu R:%hu]", ctx, ctx->timing.gap_sum, ctx->skip, ctx->ab_write, ctx->ab_read); - } - - if (llabs(ctx->timing.gap_sum) < 8) ctx->timing.gap_count = 0; - - pthread_mutex_unlock(&ctx->ab_mutex); + if (ctx->synchro.status & NTP_SYNC) { + s32_t delta = NTP2MS((s64_t) expected - (s64_t) ctx->timing.remote); + ctx->cmd_cb(RAOP_TIMING, &delta); } // now we are synced on NTP (mutex not needed) ctx->synchro.status |= NTP_SYNC; LOG_DEBUG("[%p]: Timing references local:%Lu, remote:%Lx (delta:%Ld, sum:%Ld, adjust:%Ld, gaps:%d)", - ctx, ctx->timing.local, ctx->timing.remote, delta, ctx->timing.gap_sum, ctx->timing.gap_adjust, ctx->timing.gap_count); + ctx, ctx->timing.local, ctx->timing.remote); break; } @@ -778,7 +732,7 @@ static bool rtp_request_resend(rtp_t *ctx, seq_t first, seq_t last) { // do not request silly ranges (happens in case of network large blackouts) if (seq_order(last, first) || last - first > BUFFER_FRAMES / 2) return false; - ctx->resent_frames += last - first + 1; + ctx->resent_req += last - first + 1; LOG_DEBUG("resend request [W:%hu R:%hu first=%hu last=%hu]", ctx->ab_write, ctx->ab_read, first, last); @@ -797,99 +751,3 @@ static bool rtp_request_resend(rtp_t *ctx, seq_t first, seq_t last) { return true; } - -#if 0 -/*---------------------------------------------------------------------------*/ -// get the next frame, when available. return 0 if underrun/stream reset. -static short *_buffer_get_frame(rtp_t *ctx, int *len) { - short buf_fill; - abuf_t *curframe = 0; - int i; - u32_t now, playtime; - - if (!ctx->playing) return NULL; - - // skip frames if we are running late and skip could not be done in SYNC - while (ctx->skip && seq_order(ctx->ab_read, ctx->ab_write + 1)) { - ctx->audio_buffer[BUFIDX(ctx->ab_read)].ready = 0; - ctx->ab_read++; - ctx->skip--; - LOG_INFO("[%p]: Sending packets too slow (skip: %d) [W:%hu R:%hu]", ctx, ctx->skip, ctx->ab_write, ctx->ab_read); - } - - buf_fill = ctx->ab_write - ctx->ab_read + 1; - - if (buf_fill >= BUFFER_FRAMES) { - LOG_ERROR("[%p]: Buffer overrun %hu", ctx, buf_fill); - ctx->ab_read = ctx->ab_write - (BUFFER_FRAMES - 64); - buf_fill = ctx->ab_write - ctx->ab_read + 1; - } - - now = gettime_ms(); - curframe = ctx->audio_buffer + BUFIDX(ctx->ab_read); - - // use next frame when buffer is empty or silence continues to be sent - if (!buf_fill) curframe->rtptime = ctx->audio_buffer[BUFIDX(ctx->ab_read - 1)].rtptime + ctx->frame_size; - - playtime = ctx->synchro.time + (((s32_t)(curframe->rtptime - ctx->synchro.rtp))*1000)/44100; - - LOG_SDEBUG("playtime %u %d [W:%hu R:%hu] %d", playtime, playtime - now, ctx->ab_write, ctx->ab_read, curframe->ready); - - // wait if not ready but have time, otherwise send silence - if (!buf_fill || ctx->synchro.status != (RTP_SYNC | NTP_SYNC) || (now < playtime && !curframe->ready)) { - LOG_SDEBUG("[%p]: waiting (fill:%hd, W:%hu R:%hu) now:%u, playtime:%u, wait:%d", ctx, buf_fill, ctx->ab_write, ctx->ab_read, now, playtime, playtime - now); - // look for "blocking" frames at the top of the queue and try to catch-up - for (i = 0; i < min(16, buf_fill); i++) { - abuf_t *frame = ctx->audio_buffer + BUFIDX(ctx->ab_read + i); - if (!frame->ready && now - frame->last_resend > RESEND_TO) { - rtp_request_resend(ctx, ctx->ab_read + i, ctx->ab_read + i); - frame->last_resend = now; - } - } - return NULL; - } - - // when silence is inserted at the top, need to move write pointer - if (!buf_fill) { - if (!ctx->filled_frames) { - LOG_WARN("[%p]: start silence (late %d ms) [W:%hu R:%hu]", ctx, now - playtime, ctx->ab_write, ctx->ab_read); - } - ctx->ab_write++; - ctx->filled_frames++; - } else ctx->filled_frames = 0; - - if (!(ctx->out_frames++ & 0x1ff)) { - LOG_INFO("[%p]: drain [level:%hd gap:%d] [W:%hu R:%hu] [R:%u S:%u F:%u]", - ctx, buf_fill-1, playtime - now, ctx->ab_write, ctx->ab_read, - ctx->resent_frames, ctx->silent_frames, ctx->filled_frames); - } - - // each missing packet will be requested up to (latency_frames / 16) times - for (i = 16; seq_order(ctx->ab_read + i, ctx->ab_write); i += 16) { - abuf_t *frame = ctx->audio_buffer + BUFIDX(ctx->ab_read + i); - if (!frame->ready && now - frame->last_resend > RESEND_TO) { - rtp_request_resend(ctx, ctx->ab_read + i, ctx->ab_read + i); - frame->last_resend = now; - } - } - - if (!curframe->ready) { - LOG_DEBUG("[%p]: created zero frame (W:%hu R:%hu)", ctx, ctx->ab_write, ctx->ab_read); - memset(curframe->data, 0, ctx->frame_size*4); - curframe->len = ctx->frame_size * 4; - ctx->silent_frames++; - } else { - LOG_SDEBUG("[%p]: prepared frame (fill:%hd, W:%hu R:%hu)", ctx, buf_fill-1, ctx->ab_write, ctx->ab_read); - } - - *len = curframe->len; - curframe->ready = 0; - ctx->ab_read++; - - return curframe->data; -} -#endif - - - - diff --git a/components/raop/rtp.h b/components/raop/rtp.h index 0acb8c6a..554690ad 100644 --- a/components/raop/rtp.h +++ b/components/raop/rtp.h @@ -1,6 +1,7 @@ #ifndef _HAIRTUNES_H_ #define _HAIRTUNES_H_ +#include "raop_sink.h" #include "util.h" typedef struct { @@ -8,11 +9,10 @@ typedef struct { struct rtp_s *ctx; } rtp_resp_t; -typedef void (*rtp_data_cb_t)(const u8_t *data, size_t len); - -rtp_resp_t rtp_init(struct in_addr host, bool sync, bool drift, bool range, int latency, +rtp_resp_t rtp_init(struct in_addr host, int latency, char *aeskey, char *aesiv, char *fmtpstr, - short unsigned pCtrlPort, short unsigned pTimingPort, rtp_data_cb_t data_cb); + short unsigned pCtrlPort, short unsigned pTimingPort, + raop_cmd_cb_t cmd_cb, raop_data_cb_t data_cb); void rtp_end(struct rtp_s *ctx); bool rtp_flush(struct rtp_s *ctx, unsigned short seqno, unsigned rtptime); void rtp_record(struct rtp_s *ctx, unsigned short seqno, unsigned rtptime); diff --git a/components/squeezelite/buffer.c b/components/squeezelite/buffer.c index ed71d811..c251b4c5 100644 --- a/components/squeezelite/buffer.c +++ b/components/squeezelite/buffer.c @@ -64,6 +64,11 @@ void buf_flush(struct buffer *buf) { mutex_unlock(buf->mutex); } +void _buf_flush(struct buffer *buf) { + buf->readp = buf->buf; + buf->writep = buf->buf; +} + // adjust buffer to multiple of mod bytes so reading in multiple always wraps on frame boundary void buf_adjust(struct buffer *buf, size_t mod) { size_t size; diff --git a/components/squeezelite/decode_external.c b/components/squeezelite/decode_external.c index 4a66607a..127b396c 100644 --- a/components/squeezelite/decode_external.c +++ b/components/squeezelite/decode_external.c @@ -36,6 +36,10 @@ extern log_level loglevel; static raop_event_t raop_state; static bool raop_expect_stop = false; +static struct { + s32_t total, count; + u32_t start_time, msplayed; +} raop_sync; /**************************************************************************************** * Common sink data handler @@ -112,8 +116,9 @@ static void bt_sink_cmd_handler(bt_sink_cmd_t cmd, ...) output.state = OUTPUT_RUNNING; LOG_INFO("BT sink playing"); break; + case BT_SINK_STOP: + _buf_flush(outputbuf); case BT_SINK_PAUSE: - case BT_SINK_STOP: output.state = OUTPUT_STOPPED; LOG_INFO("BT sink stopped"); break; @@ -152,14 +157,38 @@ void raop_sink_cmd_handler(raop_event_t event, void *param) // this is async, so player might have been deleted switch (event) { + case RAOP_TIMING: { + u32_t now = gettime_ms(); + s32_t error; + + if (output.state < OUTPUT_RUNNING || output.frames_played_dmp < output.device_frames) break; + + raop_sync.total += *(s32_t*) param; + raop_sync.count++; + raop_sync.msplayed = now - output.updated + ((u64_t) (output.frames_played_dmp - output.device_frames) * 1000) / 44100; + error = raop_sync.msplayed - (now - raop_sync.start_time); + + LOG_INFO("backend played %u, desired %u, (back:%d raop:%d)", raop_sync.msplayed, now - raop_sync.start_time, error, raop_sync.total / raop_sync.count); + + if (error < -10) { + output.skip_frames = (abs(error) * 44100) / 1000; + output.state = OUTPUT_SKIP_FRAMES; + LOG_INFO("skipping %u frames", output.skip_frames); + } else if (error > 10) { + output.pause_frames = (abs(error) * 44100) / 1000; + output.state = OUTPUT_PAUSE_FRAMES; + LOG_INFO("pausing for %u frames", output.pause_frames); + } + + break; + } case RAOP_STREAM: - // a PLAY will come later, so we'll do the load at that time LOG_INFO("Stream", NULL); raop_state = event; + raop_sync.total = raop_sync.count = 0; output.external = true; output.next_sample_rate = 44100; - output.state = OUTPUT_BUFFER; - output.threshold = 10; + output.state = OUTPUT_STOPPED; break; case RAOP_STOP: LOG_INFO("Stop", NULL); @@ -171,14 +200,17 @@ void raop_sink_cmd_handler(raop_event_t event, void *param) LOG_INFO("Flush", NULL); raop_expect_stop = true; raop_state = event; + _buf_flush(outputbuf); output.state = OUTPUT_STOPPED; + output.frames_played = 0; break; case RAOP_PLAY: { LOG_INFO("Play", NULL); - // this where we need the OUTPUT_START_AT if (raop_state != RAOP_PLAY) { - output.external = true; - output.state = OUTPUT_RUNNING; + output.state = OUTPUT_START_AT; + output.start_at = *(u32_t*) param; + raop_sync.start_time = output.start_at; + LOG_INFO("Starting at %u (in %d ms)", output.start_at, output.start_at - gettime_ms()); } raop_state = event; break; diff --git a/components/squeezelite/squeezelite.h b/components/squeezelite/squeezelite.h index 73cc363d..eea901a4 100644 --- a/components/squeezelite/squeezelite.h +++ b/components/squeezelite/squeezelite.h @@ -539,6 +539,7 @@ unsigned _buf_cont_write(struct buffer *buf); void _buf_inc_readp(struct buffer *buf, unsigned by); void _buf_inc_writep(struct buffer *buf, unsigned by); void buf_flush(struct buffer *buf); +void _buf_flush(struct buffer *buf); void buf_adjust(struct buffer *buf, size_t mod); void _buf_resize(struct buffer *buf, size_t size); void buf_init(struct buffer *buf, size_t size);