import type {
  ConnectionState,
  IAgoraRTC,
  IAgoraRTCClient,
  IAgoraRTCRemoteUser,
  ILocalTrack,
  IRemoteAudioTrack,
  IRemoteVideoTrack,
  LowStreamParameter,
  NetworkQuality,
  UID,
} from 'agora-rtc-sdk-ng'
import { isEqual, keyBy, round } from 'lodash'
import { postZod } from 'sierra-client/api'
import { AGORA_CONFIG } from 'sierra-client/components/liveV2/services/video-call-service/helpers/agora-config'
import { isDualStreamSupported } from 'sierra-client/components/liveV2/services/video-call-service/helpers/browser-support'
import {
  connectionCanSubscribe,
  connectionIsHealthy,
} from 'sierra-client/components/liveV2/services/video-call-service/helpers/network-quality'
import { rejectOnTimeout } from 'sierra-client/components/liveV2/services/video-call-service/helpers/throw-on-timeout'
import {
  ClientJoinState,
  NetworkQualityStats,
  RemoteParticipant,
  VideoSubscribeConfig,
} from 'sierra-client/components/liveV2/services/video-call-service/types'
import { SequentialTaskRunner } from 'sierra-client/lib/sequential-task-runner'
import { ExponentialMovingAverage } from 'sierra-client/lib/statistics/exponential-moving-average'
import { MovingAverage } from 'sierra-client/lib/statistics/moving-average'
import { RollingWindowAverage } from 'sierra-client/lib/statistics/rolling-window-average'
import { logger } from 'sierra-client/logger/logger'
import { typedPost } from 'sierra-client/state/api'
import { RequestError } from 'sierra-domain/error'
import { XRealtimeUserAgoraAccessToken, XRealtimeUserSetAgoraUid } from 'sierra-domain/routes'
import { retry } from 'ts-retry-promise'

const AGORA_HIGH_REMOTE_STREAM_QUALITY = 0
const AGORA_LOW_REMOTE_STREAM_QUALITY = 1

type ClientOptions = {
  proxy?: boolean
  onParticipantsChanged?: () => void
  onNetworkQualityChanged?: () => void
  onConnectionStateChanged?: () => void
  lowStreamParameter?: LowStreamParameter
  useExperimentalStatistics: boolean
  useSlowJoinProxy: boolean
  useVp9: boolean
}

type RemoteParticipantVolume = {
  id: string
  volume: number
}

async function logTime<T>(fn: () => Promise<T>, name: string): Promise<T> {
  const t0 = performance.now()
  logger.debug(`[logTime] ${name} started`, { startTime: t0 })

  const res = await fn()
  const t1 = performance.now()
  const duration = t1 - t0

  logger.debug(`[logTime] ${name} done, took ${duration} milliseconds.`, {
    startTime: t0,
    duration,
  })

  return res
}

function delay(ms: number): Promise<void> {
  return new Promise(resolve => setTimeout(() => resolve(), ms))
}

type ConnectionDisconnectedReason = 'NETWORK_ERROR' | 'LEAVE'

export class AgoraClient {
  private networkQuality: NetworkQualityStats | undefined
  private uplinkWindowedAverage: MovingAverage
  private downlinkWindowedAverage: MovingAverage
  private useProxy = false
  private isUsingProxy: boolean | undefined
  private useExperimentalStatistics = false
  private videoSubscribeConfig: VideoSubscribeConfig = []
  private updateSubscriptionsTaskRunner = new SequentialTaskRunner(1)
  private joinChannelTaskRunner = new SequentialTaskRunner(1)
  private publishTaskRunner = new SequentialTaskRunner(1)
  private checkAudioSubscriptionInterval: ReturnType<typeof setInterval> | undefined
  private transcriptionToken: string | undefined = undefined

  private constructor(
    private client: IAgoraRTCClient,
    private options: ClientOptions
  ) {
    client.on('user-joined', this.handleNewUserJoinedChannel)
    client.on('user-left', this.handleParticipantsChanged)
    client.on('network-quality', this.handleNetworkQualityChanged)
    client.on('connection-state-change', this.handleConnectionStateChanged)

    client.on('is-using-cloud-proxy', this.handleIsUsingCloudProxyCallback)
    client.on('join-fallback-to-proxy', this.handleFallbackToProxy)

    client.on('user-unpublished', this.handleParticipantsChanged)
    client.on('user-published', this.handleParticipantPublished)
    this.useProxy = options.proxy ?? false
    this.useExperimentalStatistics = options.useExperimentalStatistics
    this.uplinkWindowedAverage = this.getMovingAverageImplementation()
    this.downlinkWindowedAverage = this.getMovingAverageImplementation()
  }

  private getMovingAverageImplementation(): MovingAverage {
    if (this.useExperimentalStatistics) {
      return new ExponentialMovingAverage(10)
    }
    return new RollingWindowAverage(10)
  }

  getAgoraClient(): IAgoraRTCClient {
    return this.client
  }

  getIsUsingProxy(): boolean | undefined {
    return this.isUsingProxy
  }

  getTranscriptionToken(): string | undefined {
    return this.transcriptionToken
  }

  handleConnectionStateChanged = (
    currentState: ConnectionState,
    previousState: ConnectionState,
    disconnectedReason?: ConnectionDisconnectedReason
  ): void => {
    logger.info('Agora: connection-state-change', {
      tags: { currentState, previousState, disconnectedReason },
    })

    if (currentState === 'CONNECTED') {
      void this.runStateChecks()
      this.checkAudioSubscriptionInterval = setInterval(this.checkAudioSubscriptions, 5000)
    } else {
      clearInterval(this.checkAudioSubscriptionInterval)
    }

    // It seems agora can become disconnected with network errors, which it doesn't recover from
    // we try to recover from this manually
    if (currentState === 'DISCONNECTED' && disconnectedReason === 'NETWORK_ERROR') {
      logger.info('Agora: connection-state-change disconnected with error')
      void this.runStateChecks()
    }

    this.options.onConnectionStateChanged?.()
  }

  runStateChecks = async (): Promise<void> => {
    await this.updateChannelStateFromParams()
    await this.checkAudioSubscriptions()
    await this.updatePublishState()
  }

  handleNewUserJoinedChannel = async (participant: IAgoraRTCRemoteUser): Promise<void> => {
    await this.client.setStreamFallbackOption(participant.uid, 2) // AUDIO_ONLY
    this.handleParticipantsChanged()
  }

  handleParticipantsChanged = (): void => {
    this.updateVideoSubsciptions()
  }

  handleNetworkQualityChanged = (stats: NetworkQuality): void => {
    const uplinkNow = this.uplinkWindowedAverage.push(stats.uplinkNetworkQuality)
    const downlinkNow = this.downlinkWindowedAverage.push(stats.downlinkNetworkQuality)
    const newStats: NetworkQualityStats = {
      downlink: {
        currentDownlink: stats.downlinkNetworkQuality,
        windowedDownlink: round(downlinkNow) as typeof stats.downlinkNetworkQuality,
      },
      uplink: {
        currentUplink: stats.uplinkNetworkQuality,
        windowedUplink: round(uplinkNow) as typeof stats.uplinkNetworkQuality,
      },
    }

    if (!isEqual(this.networkQuality, newStats)) {
      this.networkQuality = newStats
      this.options.onNetworkQualityChanged?.()
    }
  }

  handleIsUsingCloudProxyCallback = (isUsingProxy: boolean): void => {
    logger.info(`Is using cloud proxy - isUsingProxy=${isUsingProxy} useProxySetting=${this.useProxy}`, {
      isUsingProxy,
      useProxySetting: this.useProxy,
    })

    this.isUsingProxy = isUsingProxy
    // if the client joined using a proxy, we default to using the proxy going forward
    this.useProxy = isUsingProxy || this.useProxy
  }

  handleFallbackToProxy = (slowJoin: boolean = false): void => {
    logger.info(`join fallback to proxy, will use proxy going forward - slowJoin=${slowJoin}`)
    this.useProxy = true
  }

  handleParticipantPublished = async (
    participant: IAgoraRTCRemoteUser,
    mediaType: 'audio' | 'video'
  ): Promise<void> => {
    if (this.client.connectionState !== 'CONNECTED' && this.client.connectionState !== 'RECONNECTING') {
      logger.debug('Client not in a state where it should try to subscribe', {
        connectionState: this.client.connectionState,
      })
      return
    }

    try {
      if (mediaType === 'audio') {
        await this.subscribeWithRetry(participant, 'audio')
      } else {
        this.updateVideoSubsciptions()
      }
    } catch (error) {
      logger.error('Error subscribing to stream', {
        error,
        mediaType,
        agoraId: participant.uid,
      })
    }

    this.options.onParticipantsChanged?.()
  }

  private async getAccessToken(channelName: string): Promise<{ accessToken: string; appId: string }> {
    const result = await typedPost(XRealtimeUserAgoraAccessToken, {
      channelName,
    })
    const { accessToken, appId, transcriptionToken } = result
    this.transcriptionToken = transcriptionToken

    return { accessToken, appId }
  }

  private async getAccessTokenWithRetry(
    channelName: string
  ): Promise<{ accessToken: string; appId: string }> {
    return retry(
      () => {
        return this.getAccessToken(channelName)
      },
      {
        retries: 2,
        delay: 100,
        backoff: 'LINEAR',
        logger: (message: string) => {
          logger.info(`getAccessToken: ${message}`)
        },
        retryIf: (error: Error) => {
          if (RequestError.isAccessError(error)) {
            return false
          }
          return true
        },
      }
    )
  }

  private async notifyBackendOfUid(channelName: string, agoraUid: UID): Promise<void> {
    await retry(
      () => {
        return postZod(XRealtimeUserSetAgoraUid, { channelName, agoraUid: agoraUid.toString() })
      },
      {
        retries: 4,
        delay: 500,
        backoff: 'LINEAR',
        logger: (message: string) => {
          logger.info(`notifyBackendOfUid: ${message}`)
        },
        retryIf: (error: Error) => {
          if (RequestError.isAccessError(error)) {
            return false
          }
          return true
        },
      }
    )
  }

  private async useProxyOnSlowJoin(
    appId: string,
    channelName: string,
    accessToken: string,
    joinAsUserId?: number
  ): Promise<UID> {
    const res = await Promise.race([
      this.client.join(appId, channelName, accessToken, joinAsUserId),
      delay(5000).then(() => 'timeout' as const),
    ])

    if (res === 'timeout') {
      this.handleFallbackToProxy(true)
      await this.client.leave()
      this.client.startProxyServer(AGORA_CONFIG.proxyMode)
      return this.client.join(appId, channelName, accessToken, joinAsUserId)
    }
    return res
  }

  private async joinWithRetry(
    appId: string,
    channelName: string,
    accessToken: string,
    joinAsUserId?: number
  ): Promise<string> {
    try {
      const uid = await retry(
        () => {
          if (this.useProxy) {
            try {
              this.client.startProxyServer(AGORA_CONFIG.proxyMode)
            } catch (error) {
              logger.error('failed to start proxy', { error })
            }
          }

          return this.options.useSlowJoinProxy
            ? this.useProxyOnSlowJoin(appId, channelName, accessToken, joinAsUserId)
            : this.client.join(appId, channelName, accessToken, joinAsUserId)
        },
        {
          retries: 2,
          delay: 100,
          backoff: 'LINEAR',
          logger: (message: string) => {
            logger.info(`joinWithRetry: ${message}`)
          },

          timeout: 30 * 1000, // timeout after 30 seconds
        }
      )
      return uid.toString()
    } catch (error) {
      if (error instanceof Error && 'lastError' in error) {
        throw error.lastError
      }
      throw error
    }
  }

  private async subscribeWithRetry(
    participant: IAgoraRTCRemoteUser,
    mediaType: 'audio' | 'video'
  ): Promise<void> {
    try {
      await retry(
        async () => {
          await this.client.subscribe(participant, mediaType)
        },
        {
          retries: 2,
          retryIf: () => {
            return connectionCanSubscribe(this.connectionState)
          },
          delay: 100,
          backoff: 'LINEAR',
          logger: (message: string) => {
            logger.info(`subscribeWithRetry: ${message}`)
          },
        }
      )
    } catch (error) {
      if (error instanceof Error && 'lastError' in error) {
        throw error.lastError
      }
      throw error
    }
  }

  updateVideoSubsciptions(): void {
    // Because this action both subscribes and unsubscribes to streams it's very sensitive to race conditions
    // Two calls with different configs before the first one has finished can cause the end state to be wrong
    // To avoid this we queue up the calls and only run one at a time

    this.updateSubscriptionsTaskRunner.push(async () => {
      // The values refered to in `this` can change since it's async, so we get copies of them before the async calls
      const remoteUsers = this.client.remoteUsers
      const currentConfig = this.videoSubscribeConfig

      const config = keyBy(currentConfig, 'userId')

      const subList = []
      const unsubList = []

      // Subscribe and unsubscribe to the available video streams to match the config
      for (const user of remoteUsers) {
        const userSubValue = { user, mediaType: 'video' as const }

        const userConfig = config[user.uid.toString()]
        const isSubscribedToVideo = !!user.videoTrack
        const userIsPublishingVideo = user.hasVideo
        const shouldSubscribeToVideo =
          userConfig !== undefined && userConfig.quality !== 'off' && userIsPublishingVideo

        if (shouldSubscribeToVideo && !isSubscribedToVideo) subList.push(userSubValue)
        if (!shouldSubscribeToVideo && isSubscribedToVideo) unsubList.push(userSubValue)
      }

      // TODO: handle if the mass subscribe fails? Perhaps we should retry? in a few seconds?
      await Promise.allSettled([
        subList.length > 0 && this.client.massSubscribe(subList),
        unsubList.length > 0 && this.client.massUnsubscribe(unsubList),
      ])

      const users = keyBy(this.client.remoteUsers, user => user.uid.toString())
      const updatedStreamTypes = []
      // Ensure the video quality matches the config
      for (const config of currentConfig) {
        if (config.quality === 'off') continue
        const user = users[config.userId]
        if (!user || !user.videoTrack) continue

        updatedStreamTypes.push(
          this.client.setRemoteVideoStreamType(
            user.uid,
            config.quality === 'low' ? AGORA_LOW_REMOTE_STREAM_QUALITY : AGORA_HIGH_REMOTE_STREAM_QUALITY
          )
        )
      }

      await Promise.all(updatedStreamTypes)
      this.options.onParticipantsChanged?.()
    })
  }

  checkAudioSubscriptions = async (): Promise<void> => {
    const usersNotSubscribedTo = this.client.remoteUsers.filter(user => user.hasAudio && !user.audioTrack)

    if (usersNotSubscribedTo.length > 0) {
      logger.info('[checkAudioSubscriptions] Not subscribing to all available audio streams', {
        usersNotSubscribedTo: usersNotSubscribedTo.map(user => user.uid.toString()),
        noUsersNotSubscribedTo: usersNotSubscribedTo.length,
      })

      const subscribeResults = await this.client.massSubscribe(
        usersNotSubscribedTo.map(user => ({ user, mediaType: 'audio' }))
      )

      const errorList = subscribeResults.filter(result => result.error)
      if (errorList.length > 0) {
        logger.error('[checkAudioSubscriptions] Error subscribing to audio streams', {
          users: errorList.map(error => error.user.uid.toString()),
          noUsers: errorList.length,
        })
      }
      this.options.onParticipantsChanged?.()
    }
  }

  getRemoteUsers(): RemoteParticipant[] {
    return this.client.remoteUsers.map(user => ({
      id: user.uid.toString(),
      isPublishingAudio: user.hasAudio,
      isPublishingVideo: user.hasVideo,
      isSubscribedToAudio: !!user.audioTrack,
      isSubscribedToVideo: !!user.videoTrack,
    }))
  }

  getVideoSubscribeConfig(): VideoSubscribeConfig {
    return this.videoSubscribeConfig
  }

  getRemoteUserVideoTrack(userId: string): IRemoteVideoTrack | undefined {
    const user = this.client.remoteUsers.find(user => user.uid.toString() === userId)
    return user?.videoTrack
  }

  getRemoteUserAudioTrack(userId: string): IRemoteAudioTrack | undefined {
    const user = this.client.remoteUsers.find(user => user.uid.toString() === userId)
    return user?.audioTrack
  }

  getRemoteUserVolumes(): RemoteParticipantVolume[] {
    return this.client.remoteUsers
      .filter(user => user.audioTrack !== undefined)
      .map(user => ({
        id: user.uid.toString(),
        volume: user.audioTrack?.getVolumeLevel() ?? 0,
      }))
  }

  get connectionState(): ConnectionState {
    return this.client.connectionState
  }

  get joinState(): ClientJoinState {
    return this.channelJoinState
  }

  get channelName(): string | undefined {
    return this.client.channelName
  }

  get clientId(): string | undefined {
    return this.client.uid?.toString()
  }

  get isConnected(): boolean {
    const connectionState = this.client.connectionState

    return (
      connectionIsHealthy(connectionState) &&
      this.client.channelName !== undefined &&
      this.clientId !== undefined
    )
  }

  getNetworkQualityStats(): NetworkQualityStats | undefined {
    return this.networkQuality
  }

  get isPublishing(): boolean {
    return this.client.localTracks.length > 0
  }

  get isPublishingAudio(): boolean {
    return this.client.localTracks.find(track => track.trackMediaType === 'audio') !== undefined
  }

  get isPublishingVideo(): boolean {
    return this.client.localTracks.find(track => track.trackMediaType === 'video') !== undefined
  }

  get role(): 'host' | 'audience' {
    return this.client.role
  }

  private channelParams?: { channelName: string; joinAsUserId?: number } = undefined
  private channelJoinState: ClientJoinState = 'not-joined'

  private updateChannelStateFromParams(): Promise<void> {
    // the goal is to ensure that the current channel state matches the params that has been specified
    // and as long as the state matches it's not going to do anything
    // and if the state doesn't match it's going to change it to match.
    //
    // It's important that the task is idempotent!
    //
    // this ensures we can run this often and safly, and it's not going to cause any issues
    // and we can always recover should we run into issues when joining a channel
    this.joinChannelTaskRunner.push(async () => {
      const params = this.channelParams
      const currentChannel = this.channelName
      const currentUserId = this.clientId

      const currentStateMatchDesiredState =
        params?.channelName === currentChannel &&
        ((params?.joinAsUserId !== undefined && `${params.joinAsUserId}` === currentUserId) ||
          params?.joinAsUserId === undefined) &&
        // while the client is connecting we don't know what the client is trying to do
        // so we say it's not the current state so we can ensure we create the correct one
        this.connectionState !== 'CONNECTING'

      if (currentStateMatchDesiredState) {
        // if the current state matches the desired state, we can only be in one of two states: joined or not joined
        const currentState: ClientJoinState = currentChannel !== undefined ? 'joined' : 'not-joined'
        this.channelJoinState = currentState
        return
      }

      this.channelJoinState = 'changing'

      if (currentChannel !== undefined || this.connectionState === 'CONNECTING') {
        await logTime(() => this._leave(), 'this._leave()')
      }

      // since the leave call above is async the channel params may have changed
      const paramsHasChanged = this.channelParams !== params
      const dontWantToBeInAnyChannel = params === undefined

      // if the params has changed we stop this task so the next one can restart
      // this assumes there is another task scheduled to run after this one
      if (paramsHasChanged) {
        return
      }

      if (dontWantToBeInAnyChannel) {
        this.channelJoinState = 'not-joined'
        return
      }

      try {
        const { accessToken, appId } = await this.getAccessTokenWithRetry(params.channelName)
        const uid = await this.joinWithRetry(appId, params.channelName, accessToken, params.joinAsUserId)
        void this.notifyBackendOfUid(params.channelName, uid)
        this.channelJoinState = 'joined'
      } catch (error) {
        logger.error('[updateChannelStateFromParams] Error joining channel', { error })
        if (RequestError.isAccessError(error)) {
          this.channelJoinState = 'auth-error'
          return
        }

        // wait 3 seconds before retrying
        // but also block the task runner so it still has ongoing tasks
        await delay(3000)
        void this.updateChannelStateFromParams()
      }
    })

    return new Promise<void>(resolve => this.joinChannelTaskRunner.once('drained', resolve))
  }

  async join(channelName: string, joinAsUserId?: number): Promise<string | undefined> {
    this.channelParams = { channelName, joinAsUserId }
    await this.updateChannelStateFromParams()
    if (this.channelJoinState !== 'joined') {
      throw new Error('Failed to join channel')
    }
    return this.clientId
  }

  async leave(): Promise<void> {
    this.channelParams = undefined
    await this.updateChannelStateFromParams()
  }

  private async _leave(): Promise<void> {
    // Make sure we clean up all publishing tracks when leaving the channel
    // Otherwise the tracks will be kept in the channel and will be published again if you re-join
    void this.unpublish()

    if (connectionIsHealthy(this.client.connectionState)) {
      await this.client.leave()
    }

    if (this.useProxy) {
      try {
        this.client.stopProxyServer()
      } catch (error) {
        logger.error('failed to stop proxy', { error })
      }
    }

    this.isUsingProxy = undefined
  }

  private async updatePublishState(): Promise<void> {
    this.publishTaskRunner.push(async () => {
      // don't publish if we're not connected
      if (this.client.connectionState !== 'CONNECTED') return

      const currentPublishedTracks = this.client.localTracks
      const trackConfig = this.tracksToPublish

      const tracksToPublish = []
      const tracksToUnpublish = []

      for (const track of trackConfig ?? []) {
        if (currentPublishedTracks.includes(track)) continue
        tracksToPublish.push(track)
      }

      for (const track of currentPublishedTracks) {
        if (trackConfig?.includes(track) === true) continue
        tracksToUnpublish.push(track)
      }

      logger.info('[updatePublishState]: updating state', {
        tracksToUnpublishLength: tracksToUnpublish.length,
        tracksToPublishLength: tracksToPublish.length,
      })

      try {
        if (tracksToUnpublish.length) {
          await rejectOnTimeout(
            this.client.unpublish(tracksToUnpublish),
            'this.client.unpublish(tracksToUnpublish)',
            3000
          )
        }

        // check if config has changed, and if so, abort
        if (trackConfig !== this.tracksToPublish) return

        if (tracksToPublish.length) {
          await rejectOnTimeout(this.client.setClientRole('host'), "this.client.setClientRole('host')", 3000) // Must be host to publish
          await rejectOnTimeout(
            this.client.publish(tracksToPublish),
            'this.client.publish(tracksToPublish)',
            3000
          )
        }

        if (this.client.localTracks.length === 0) {
          await rejectOnTimeout(
            this.client.setClientRole('audience'),
            "await this.client.setClientRole('audience')",
            3000
          )
        }
      } catch (error) {
        logger.error('[updatePublishState] Error publishing tracks', {
          error,
          connectionState: this.client.connectionState,
        })

        // wait 3 seconds before retrying
        // but also block the task runner so it still has ongoing tasks
        await delay(3000)
        void this.updatePublishState()
      }
    })

    return new Promise<void>(resolve => this.publishTaskRunner.once('drained', resolve))
  }

  private tracksToPublish?: ILocalTrack[]
  private async setPublishedTracks(tracks?: ILocalTrack[]): Promise<void> {
    this.tracksToPublish = tracks
    await this.updatePublishState()
  }

  async publish(tracks: ILocalTrack[]): Promise<void> {
    return this.setPublishedTracks(tracks)
  }

  async unpublish(): Promise<void> {
    return this.setPublishedTracks()
  }

  setVideoSubscribeConfig(config: VideoSubscribeConfig): void {
    this.videoSubscribeConfig = config
    this.updateVideoSubsciptions()
  }

  static async createWrappedClient(sdk: IAgoraRTC, options: ClientOptions): Promise<AgoraClient> {
    const agoraClient = await this.createClient(sdk, options.useVp9, options.lowStreamParameter)
    return new AgoraClient(agoraClient, options)
  }

  /**
   * Since the construction of the client is async we expose a factory method to create a new instance
   * @param sdk agora sdk instance
   * @returns
   */
  static createClient = async (
    sdk: IAgoraRTC,
    useVp9: boolean,
    lowStreamParameter?: LowStreamParameter
  ): Promise<IAgoraRTCClient> => {
    try {
      const support = await sdk.getSupportedCodec()
      const supportsVp9 = support.video.includes('VP9')
      const codec = supportsVp9 && useVp9 ? 'vp9' : AGORA_CONFIG.codec
      const client = sdk.createClient({ codec, mode: AGORA_CONFIG.mode })

      if (isDualStreamSupported && lowStreamParameter) {
        try {
          await client.enableDualStream()
          client.setLowStreamParameter(lowStreamParameter)
        } catch {
          logger.captureWarning('Failed to enable dual stream')
        }
      }

      return client
    } catch (e) {
      console.error(e)
      throw e
    }
  }
}
