'use strict'

import * as rxjs from 'rxjs'
import * as operators from 'rxjs/operators'
import RequestJs from '@gidw/request-js'
import * as BasUtil from '@basalte/bas-util'
import BasError from '../errors/baserror'

var _AS_CONNECTION_INIT = 'connection_init'
var _AS_CONNECTION_ACK = 'connection_ack'
var _AS_START = 'start'
var _AS_START_ACK = 'start_ack'
var _AS_DATA = 'data'
var _AS_STOP = 'stop'
var _AS_COMPLETE = 'complete'
var _AS_ERROR = 'error'
var _AS_KA = 'ka'

var _WS_TIMEOUT = 2000

var _WS_PROTOCOL = 'graphql-ws'
var _WS_INIT_PAYLOAD = {}
var _WS_PARAM_PAYLOAD = btoa(JSON.stringify(_WS_INIT_PAYLOAD))
var _WS_PATH_PAYLOAD_PARAM = '&payload=' + _WS_PARAM_PAYLOAD
var _WS_CONNECTION_INIT = JSON.stringify({
  type: _AS_CONNECTION_INIT
})

var _K_ERRORS = 'errors'
var _K_ERROR_TYPE = 'errorType'

var _E_WAF_FORBIDDEN = 'WAFForbiddenException'

var LOGGING = false

var _idCounter = 0

function _getId () {

  return _idCounter++
}

/**
 * @callback CBasUnsubscribe
 */

/**
 * @typedef {Object} TBasAppSyncSubscribeIdContext
 * @property {string} id
 */

/**
 * @typedef {Object} TBasAppSyncQuery
 * @property {string} query
 * @property {Object} [variables]
 */

/**
 * @callback CBasAppSyncTokenGenerator
 * @returns {Promise<string>}
 */

/**
 * @typedef {Object} TBasAppSyncQueryOptions
 * @property {number[]} [multipleRequestOffsets]
 */

/**
 * @typedef {Object} TBasAppSyncSubscribeOptions
 * @property {number[]} [multipleSubscriptionLinkOffsets]
 */

/**
 * @callback CBasAppSyncSocketCallback
 * @param {?BasError} error
 * @param {TBasAppSyncSocket} result
 */

/**
 * @callback CBasAppSyncSocketAbort
 */

/**
 * @typedef {Object} TBasAppSyncSocket
 * @property {CBasAppSyncSocketAbort} abort
 * @property {boolean} finished
 * @property {BasAppSyncSocket} [ws]
 */

/**
 * @typedef {Object} TBasAppSyncRequest
 * @property {boolean} finished
 * @property {CBasAppSyncRequestAbort} abort
 * @property {RequestJs.RequestJsResult} [result]
 */

/**
 * @callback CBasAppSyncRequestAbort
 */

/**
 * @callback CBasAppSyncRequestCallback
 * @param {?BasError} error
 * @param {TBasAppSyncRequest} result
 */

/**
 * @typedef {Object} TBasAppSyncSubscription
 * @property {string} id
 * @property {rxjs.Subscriber} subscriber
 */

/**
 * @typedef {Object} TBasAppSyncSocketRequest
 * @property {boolean} finished
 * @property {string} id
 * @property {CBasAppSyncSocketRequestResolve} resolve
 * @property {CBasAppSyncSocketRequestReject} reject
 * @property {number} [timeoutId]
 */

/**
 * @callback CBasAppSyncSocketRequestResolve
 * @param {string} id
 */

/**
 * @callback CBasAppSyncSocketRequestReject
 * @param {*} error
 */

/**
 * @typedef {Object} TBasAppSyncSocketResult
 * @property {WebSocket} socket
 * @property {Object} initialPayload
 */

/**
 * @constructor
 * @param {string} apiUrlId
 * @param {string} region
 * @param {CBasAppSyncTokenGenerator} tokenGenerator
 */
function BasAppSyncClient (
  apiUrlId,
  region,
  tokenGenerator
) {
  /**
   * @type {string}
   */
  this._appSyncHost =
    apiUrlId + '.appsync-api.' + region + '.amazonaws.com'

  /**
   * @type {string}
   */
  this._appSyncRealtimeHost =
    apiUrlId + '.appsync-realtime-api.' + region + '.amazonaws.com'

  /**
   * @type {string}
   */
  this._httpLink = 'https://' + this._appSyncHost + '/graphql'

  /**
   * @type {string}
   */
  this._wsLink = 'wss://' + this._appSyncRealtimeHost + '/graphql'

  this._stopped = false

  /**
   * @type {CBasAppSyncTokenGenerator}
   */
  this._tokenGenerator = tokenGenerator

  this.__validSubscriptionLinkFilter =
    this._validSubscriptionLinkFilter.bind(this)
  this.__subscriptionLinkFilter =
    this._subscriptionLinkFilter.bind(this)

  /**
   * @private
   * @type {TBasAppSyncSubscription[]}
   */
  this._subscribers = []

  /**
   * @type {rxjs.Subject}
   */
  this.subscriptionEvents = new rxjs.Subject()

  /**
   * @private
   * @type {?BasAppSyncSocket}
   */
  this._subscriptionLink = null

  // TODO Detect if there no subscribers on the subjects anymore?
  //  To clear/stop any ongoing link creations?

  /**
   * @private
   * @type {rxjs.BehaviorSubject<BasAppSyncSocket>}
   */
  this._subscriptionLinkSubject = new rxjs.BehaviorSubject(null)
    .pipe(operators.filter(this.__subscriptionLinkFilter))

  /**
   * @private
   * @type {?rxjs.Subscription}
   */
  this._subscriptionLinkSubscription = null

  /**
   * @private
   * @type {rxjs.BehaviorSubject<BasAppSyncSocket>}
   */
  this._newSubscriptionLinkSubject = new rxjs.BehaviorSubject(null)
    .pipe(operators.filter(this.__validSubscriptionLinkFilter))

  /**
   * @private
   * @type {?rxjs.Subscription}
   */
  this._newSubscritpionLinkSubscription = null

  this._createSubscriptionLink()
}

/**
 * Subscription Event - start
 *
 * @constant {string}
 */
BasAppSyncClient.SE_START = _AS_START_ACK

/**
 * Subscription Event - complete
 *
 * @type {string}
 */
BasAppSyncClient.SE_COMPLETE = _AS_COMPLETE

/**
 * @constant {string}
 */
BasAppSyncClient.ERR_WAF = 'BasAppSyncClientErrorWAF'

/**
 * @param {TBasAppSyncQuery} config
 * @param {TBasAppSyncQueryOptions} [options]
 * @returns {Promise}
 */
BasAppSyncClient.prototype.query = function (
  config,
  options
) {

  var _this, _token, _log
  var _id, _tStart, _tToken

  if (this._stopped) {

    return Promise.reject(new BasError(
      'Client invalid',
      undefined,
      'Client is stopped'
    ))
  }

  _this = this

  _log = config.query.indexOf('getIceServers') > -1

  if (_log) {

    if (LOGGING) {

      _id = _getId()
      _tStart = Date.now()

      // eslint-disable-next-line no-console
      console.info('getIceServers (' + _id + ')')
    }
  }

  return this._tokenGenerator().then(_onToken)

  /**
   * @private
   * @param {string} token
   * @returns {Promise}
   */
  function _onToken (token) {

    _token = token

    if (_log) {

      if (LOGGING) {

        _tToken = Date.now()

        // eslint-disable-next-line no-console
        console.info(
          'getIceServers (' + _id + ') TOKEN RECEIVED',
          (_tToken - _tStart)
        )
      }
    }

    return new Promise(_requestPromiseConstructor)
  }

  function _requestPromiseConstructor (resolve, reject) {

    var _request, _data

    _data = {
      operationName: null,
      variables: {}
    }

    _data.query = config.query

    if (config.variables) _data.variables = config.variables

    _request = _this._query(
      {
        authorization: _token
      },
      JSON.stringify(_data),
      _onRequest,
      options
    )

    /**
     * @private
     * @param error
     */
    function _onRequest (error) {

      if (error) {

        reject(error)

      } else {

        if (_request.result && _request.result.result) {

          resolve(_request.result.result.data)

        } else {

          reject(new BasError(
            'Invalid result',
            _request,
            'Invalid request.result.result'
          ))
        }
      }
    }
  }
}

/**
 * @private
 * @param {Object} headers
 * @param {string} body
 * @param {CBasAppSyncRequestCallback} callback
 * @param {TBasAppSyncQueryOptions} [options]
 * @returns {TBasAppSyncRequest}
 */
BasAppSyncClient.prototype._query = function (
  headers,
  body,
  callback,
  options
) {

  var _result, _aborted, _cbCalled
  var _config, _requests, _wafException
  var length, i, offset

  _wafException = false

  _result = {
    finished: false,
    abort: abort
  }

  _config = {
    method: 'POST',
    url: this._httpLink,
    json: true
  }

  if (headers) _config.headers = headers
  if (body) _config.data = body

  _requests = []

  _requests.push(_startRequest())

  if (options) {

    if (Array.isArray(options.multipleRequestOffsets)) {

      length = options.multipleRequestOffsets.length
      for (i = 0; i < length; i++) {

        offset = options.multipleRequestOffsets[i]

        if (offset) {

          _requests.push(_startRequest(offset))
        }
      }
    }
  }

  return _result

  /**
   * @private
   * @param {number} [timeout]
   * @returns {Object}
   */
  function _startRequest (timeout) {

    var _requestResult, _request

    _requestResult = {
      finished: false,
      timeoutId: 0,
      abort: _abort
    }

    if (timeout) {

      _requestResult.timeoutId = setTimeout(_start, timeout)

    } else {

      _start()
    }

    return _requestResult

    function _start () {

      if (_aborted || _cbCalled) return

      _request = RequestJs(_config, _onRequest)
    }

    function _onRequest (error, result) {

      var obj, errors, _length, _i, _error

      _requestResult.finished = true

      if (error) {

        // Do nothing

        _requestResult.error = error

        if (error.data) {
          try {
            obj = JSON.parse(error.data)
          } catch (e) {
            // Empty
          }

          if (obj && Array.isArray(obj[_K_ERRORS])) {

            errors = obj[_K_ERRORS]

            _length = errors.length
            for (_i = 0; _i < _length; _i++) {
              _error = errors[_i]

              if (_error && _error[_K_ERROR_TYPE] === _E_WAF_FORBIDDEN) {

                // WAF is stopping request, stop other requests

                _wafException = true

                _stopAllRequests()
              }
            }
          }
        }

      } else {

        if (result) {

          _requestResult.result = result

        } else {

          // Do nothing
        }
      }

      if (_aborted || _cbCalled) return

      _checkRequests()
    }

    function _abort () {

      _requestResult.finished = true

      clearTimeout(_requestResult.timeoutId)
      _requestResult.timeoutId = 0

      if (_request && _request.abort) _request.abort()
    }
  }

  function _stopAllRequests () {

    var _length, _i, request

    _length = _requests.length
    for (_i = 0; _i < _length; _i++) {

      request = _requests[_i]

      if (request && !request.finished && request.abort) {

        request.abort()
      }
    }
  }

  function _checkRequests () {

    var _length, _i, request, allFinished

    if (_aborted || _cbCalled) return

    allFinished = true

    _length = _requests.length
    for (_i = 0; _i < _length; _i++) {

      request = _requests[_i]

      if (request) {

        if (request.finished) {

          if ('result' in request) {

            _result.result = request

            _clearTimeouts()

            _cb(null)

            _cleanup()

            return
          }

        } else {

          allFinished = false
        }
      }
    }

    if (_aborted || _cbCalled) return

    if (_wafException) {

      _cb(new BasError(
        BasAppSyncClient.ERR_WAF,
        undefined,
        'WAF exception'
      ))
      _cleanup()
    }

    if (_aborted || _cbCalled) return

    if (allFinished) {

      _cb(new BasError(
        'Request error',
        undefined,
        'No request succeeded'
      ))
      _cleanup()
    }
  }

  function _clearTimeouts () {

    var _length, _i, _request

    _length = _requests.length
    for (_i = 0; _i < _length; _i++) {

      _request = _requests[_i]

      if (_request) {

        clearTimeout(_request.timeoutId)
        _request.timeoutId = 0
      }
    }
  }

  function _abortRequests () {

    var _length, _i, request

    _length = _requests.length
    for (_i = 0; _i < _length; _i++) {

      request = _requests[_i]

      if (request) {

        request.abort()
      }
    }
  }

  function _cleanup () {

    _clearTimeouts()
    _abortRequests()
  }

  function abort () {

    _aborted = true

    _clearTimeouts()

    _cb(new BasError(
      'Abort',
      undefined,
      'Aborted'
    ))

    _cleanup()
  }

  /**
   * @private
   * @param {?BasError} error
   */
  function _cb (error) {

    if (!_cbCalled) {

      _cbCalled = true

      _result.finished = true

      if (BasUtil.isFunction(callback)) {

        callback(error, _result)
      }
    }
  }
}

/**
 * The idContext id property will be updated with the last subscription id.
 * Subscribing multiple times on this Observable,
 * will overwrite the idContext id!
 * Only reliable in situation with a single subscriber.
 *
 * @param {TBasAppSyncQuery} config
 * @param {string} subscriptionName
 * @param {TBasAppSyncSubscribeOptions} [options]
 * @param {TBasAppSyncSubscribeIdContext} [idContext]
 * @returns {rxjs.Observable}
 */
BasAppSyncClient.prototype.subscribe = function (
  config,
  subscriptionName,
  options,
  idContext
) {
  var _this, _id
  var _timeoutId
  var subscriptionLinkSub, newSubscriptionLinkSub
  var _subscriptionStarted, _newSubscriptionStarted
  var tag, _debugId

  _this = this

  _subscriptionStarted = false
  _newSubscriptionStarted = false

  if (LOGGING) {

    _debugId = _getId()

    tag = 'SUBSCRIBE ' + subscriptionName + ' ' + _debugId
    // eslint-disable-next-line no-console
    console.log(tag)
  }

  return new rxjs.Observable(observableConstructor)

  /**
   * @param {rxjs.Subscriber} subscriber
   * @returns {(undefined|CBasUnsubscribe)}
   */
  function observableConstructor (subscriber) {

    if (_this._stopped) {

      subscriber.error(new BasError(
        'Stopped',
        undefined,
        'Client is stopped'
      ))

      return
    }

    _timeoutId = setTimeout(_onTimeout, 10000)

    _this._tokenGenerator().then(_onToken, _onTokenError)

    return _unsubscribe

    /**
     * @private
     * @param {string} token
     */
    function _onToken (token) {

      var strConfig

      strConfig = JSON.stringify(config)

      subscriptionLinkSub = _this._subscriptionLinkSubject
        .pipe(operators.first())
        .subscribe(_onSubscriptionLink)
      newSubscriptionLinkSub = _this._newSubscriptionLinkSubject
        .pipe(operators.first())
        .subscribe(_onNewSubscriptionLink)

      if (LOGGING) {

        // eslint-disable-next-line no-console
        console.log(
          tag + ' onToken',
          !_this._newSubscritpionLinkSubscription
        )
      }

      if (!_this._newSubscritpionLinkSubscription) {

        _this._getNewLink(token, options)
      }

      /**
       * @private
       * @param {BasAppSyncSocket} value
       */
      function _onSubscriptionLink (value) {

        if (LOGGING) {

          // eslint-disable-next-line no-console
          console.log(
            tag + ' _onSubscriptionLink',
            _this._subscriptionLink === value,
            _newSubscriptionStarted
          )
        }

        if (_this._subscriptionLink === value) {

          value.startSubscription(strConfig, token).then(
            _onSubscriptionStarted,
            _onSubscriptionStartError
          )
        }

        /**
         * @private
         * @param {string} id
         */
        function _onSubscriptionStarted (id) {

          if (LOGGING) {

            // eslint-disable-next-line no-console
            console.log(
              tag + ' _onSubscriptionLink - Sub STARTED',
              _this._subscriptionLink === value,
              _newSubscriptionStarted
            )
          }

          if (_this._subscriptionLink === value) {

            _subscriptionStarted = true

            if (_newSubscriptionStarted) {

              // Do nothing, new link won

            } else {

              _clearTimeout()

              _id = id

              if (idContext) idContext.id = _id

              // eslint-disable-next-line no-console
              if (LOGGING) console.log(tag + ' Existing channel won')

              _unsubscribeNewSubscriptionLink()
              _this._clearNewSubscriptionLinkSub()

              _this.subscriptionEvents.next({
                event: BasAppSyncClient.SE_START,
                id: _id,
                name: subscriptionName
              })

              _this._subscribers.push({
                id: _id,
                subscriber: subscriber
              })
            }
          }
        }

        function _onSubscriptionStartError (_error) {

          if (LOGGING) {

            // eslint-disable-next-line no-console
            console.log(
              tag + ' _onSubscriptionLink - Sub ERROR',
              _this._subscriptionLink === value,
              _newSubscriptionStarted,
              _error
            )
          }

          if (_this._subscriptionLink === value) {

            // TODO ?
          }
        }
      }

      /**
       * @private
       * @param {BasAppSyncSocket} value
       */
      function _onNewSubscriptionLink (value) {

        if (LOGGING) {

          // eslint-disable-next-line no-console
          console.log(
            tag + ' _onNewSubscriptionLink',
            _subscriptionStarted
          )
        }

        value.startSubscription(strConfig, token).then(
          _onSubscriptionStarted,
          _onSubscriptionStartError
        )

        /**
         * @private
         * @param {string} id
         */
        function _onSubscriptionStarted (id) {

          if (LOGGING) {

            // eslint-disable-next-line no-console
            console.log(
              tag + ' _onNewSubscriptionLink - Sub STARTED',
              _subscriptionStarted
            )
          }

          _newSubscriptionStarted = true

          if (_subscriptionStarted) {

            // Do nothing, existing channel won

          } else {

            _clearTimeout()

            _id = id

            if (idContext) idContext.id = _id

            // eslint-disable-next-line no-console
            if (LOGGING) console.log(tag + ' New channel won')

            _unsubscribeSubscriptionLink()
            _this._clearSubscriptionLinkSub()

            // Replace subscription link

            _this._setSubscriptionLink(value)

            _this.subscriptionEvents.next({
              event: BasAppSyncClient.SE_START,
              id: _id,
              name: subscriptionName
            })

            _this._subscribers.push({
              id: _id,
              subscriber: subscriber
            })
          }
        }

        function _onSubscriptionStartError (error) {

          if (LOGGING) {

            // eslint-disable-next-line no-console
            console.log(
              tag + ' _onNewSubscriptionLink - Sub ERROR',
              _subscriptionStarted,
              error
            )
          }
        }
      }
    }

    /**
     * @private
     * @param error
     */
    function _onTokenError (error) {

      subscriber.error(error)
    }

    function _unsubscribeSubscriptionLink () {

      if (subscriptionLinkSub) {

        subscriptionLinkSub.unsubscribe()
      }

      subscriptionLinkSub = null
    }

    function _unsubscribeNewSubscriptionLink () {

      if (newSubscriptionLinkSub) {

        newSubscriptionLinkSub.unsubscribe()
      }

      newSubscriptionLinkSub = null
    }

    function _clearTimeout () {

      clearTimeout(_timeoutId)
      _timeoutId = 0
    }

    function _onTimeout () {

      // eslint-disable-next-line no-console
      if (LOGGING) console.log(tag + ' TIMEOUT')

      _cleanup()

      subscriber.error(new BasError(
        'Timeout',
        undefined,
        'Failed to start subscription in time'
      ))
    }

    function _cleanup () {

      _clearTimeout()
    }

    function _unsubscribe () {

      _unsubscribeSubscriptionLink()
      _unsubscribeNewSubscriptionLink()
      _clearTimeout()

      _this._takeSubscriptionWithId(_id)

      if (_this._subscriptionLink &&
        _this._subscriptionLink.subscriptionIds.indexOf(_id) > -1) {

        _this._subscriptionLink.stopSubscription(_id)
          .then(_onStopSubscription, _onStopSubscription)
      }

      function _onStopSubscription () {

        _this.subscriptionEvents.next({
          event: BasAppSyncClient.SE_COMPLETE,
          id: _id,
          name: subscriptionName
        })
      }
    }
  }
}

BasAppSyncClient.prototype.stop = function () {

  var subscribers, length, i, subscription

  this._stopped = true

  this._tokenGenerator = null

  subscribers = this._subscribers
  this._subscribers = []

  length = subscribers.length
  for (i = 0; i < length; i++) {

    subscription = subscribers[i]

    if (subscription && subscription.subscriber) {

      subscription.subscriber.complete()
    }
  }

  this._clearSubscriptionLinkSub()
  this._clearNewSubscriptionLinkSub()
}

BasAppSyncClient.prototype._createSubscriptionLink = function () {

  var _this, _unsubscribed

  _this = this
  _unsubscribed = false

  if (!this._subscriptionLinkSubscription) {

    this._subscriptionLinkSubscription = {
      unsubscribe: _unsubscribe
    }

    this._tokenGenerator().then(_onToken, _onTokenError)
  }

  /**
   * @private
   * @param {string} token
   */
  function _onToken (token) {

    if (_unsubscribed) return

    _this._subscriptionLinkSubscription = null

    _this._getSubscriptionLink(token)
  }

  function _onTokenError () {

    if (_unsubscribed) return

    _this._subscriptionLinkSubscription = null
  }

  function _unsubscribe () {

    _unsubscribed = true
  }
}

/**
 * @private
 * @param {string} token
 */
BasAppSyncClient.prototype._getSubscriptionLink = function (token) {

  var _this

  _this = this

  if (!this._subscriptionLinkSubscription) {

    this._subscriptionLinkSubscription = createAppSyncSocket(
      this._appSyncHost,
      this._wsLink,
      token
    ).pipe(operators.first()).subscribe({
      next: _next,
      error: _error,
      complete: _complete
    })
  }

  /**
   * @private
   * @param {TBasAppSyncSocketResult} value
   */
  function _next (value) {

    // eslint-disable-next-line no-console
    if (LOGGING) console.log('_getSubscriptionLink NEXT', value)

    _this._destroySubscriptionLink()
    _this._subscriptionLinkSubscription = null

    _this._subscriptionLink = new BasAppSyncSocket(
      _this,
      value.socket,
      value.initialPayload
    )

    _this._subscriptionLinkSubject.next(_this._subscriptionLink)
  }

  function _error () {

    _this._subscriptionLinkSubscription = null
  }

  function _complete () {

    _this._subscriptionLinkSubscription = null
  }
}

/**
 * @private
 * @param {BasAppSyncSocket} socket
 */
BasAppSyncClient.prototype._setSubscriptionLink =
  function (socket) {

    if (this._subscriptionLink !== socket) {

      this._destroySubscriptionLink()
      this._clearSubscriptionLinkSub()

      if (socket && socket.isOpen()) {

        this._subscriptionLink = socket
        this._subscriptionLinkSubject.next(socket)
      }
    }
  }

/**
 * @private
 */
BasAppSyncClient.prototype._destroySubscriptionLink = function () {

  if (this._subscriptionLink) {

    this._subscriptionLink.destroy()
  }

  this._subscriptionLink = null
}

/**
 * @private
 */
BasAppSyncClient.prototype._clearSubscriptionLinkSub = function () {

  if (this._subscriptionLinkSubscription) {

    this._subscriptionLinkSubscription.unsubscribe()
  }

  this._subscriptionLinkSubscription = null
}

/**
 * @private
 */
BasAppSyncClient.prototype._clearNewSubscriptionLinkSub = function () {

  if (this._newSubscritpionLinkSubscription) {

    this._newSubscritpionLinkSubscription.unsubscribe()
  }

  this._newSubscritpionLinkSubscription = null
}

/**
 * @private
 * @param {?BasAppSyncSocket} value
 * @returns {boolean}
 */
BasAppSyncClient.prototype._subscriptionLinkFilter =
  function (value) {

    return !!(
      value &&
      this._subscriptionLink &&
      this._subscriptionLink.isOpen()
    )
  }

/**
 * @private
 * @param {?BasAppSyncSocket} value
 * @returns {boolean}
 */
BasAppSyncClient.prototype._validSubscriptionLinkFilter =
  function (value) {

    return !!(value && value.isOpen())
  }

/**
 * @private
 * @param {string} token
 * @param {TBasAppSyncSubscribeOptions} [options]
 */
BasAppSyncClient.prototype._getNewLink = function (
  token,
  options
) {

  var _this, observables, offsets, length, i, offset

  _this = this

  if (LOGGING) {

    // eslint-disable-next-line no-console
    console.log('_getNewLink', !this._newSubscritpionLinkSubscription)
  }

  if (!this._newSubscritpionLinkSubscription) {

    observables = [
      this._createAppSyncSocket(token)
    ]

    if (BasUtil.isObject(options)) {

      if (Array.isArray(options.multipleSubscriptionLinkOffsets)) {

        offsets = options.multipleSubscriptionLinkOffsets
      }
    }

    if (offsets) {

      length = offsets.length
      for (i = 0; i < length; i++) {

        offset = offsets[i]

        if (BasUtil.isPNumber(offset)) {

          observables.push(
            rxjs.timer(offset).pipe(operators.switchMapTo(
              this._createAppSyncSocket(token)
            ))
          )
        }
      }
    }

    this._newSubscritpionLinkSubscription = rxjs.merge.apply(
      null,
      observables
    ).pipe(operators.first()).subscribe({
      next: _next,
      error: _error,
      complete: _complete
    })
  }

  /**
   * @private
   * @param {TBasAppSyncSocketResult} value
   */
  function _next (value) {

    // eslint-disable-next-line no-console
    if (LOGGING) console.log('_getNewLink NEXT', value)

    _this._newSubscritpionLinkSubscription = null

    _this._newSubscriptionLinkSubject.next(new BasAppSyncSocket(
      _this,
      value.socket,
      value.initialPayload
    ))

    // Attempt to clear the last remembered element for this subject
    _this._newSubscriptionLinkSubject.next(null)
  }

  function _error (subscriptionLinkError) {

    // eslint-disable-next-line no-console
    if (LOGGING) console.log('_getNewLink ERROR', subscriptionLinkError)

    _this._newSubscritpionLinkSubscription = null
  }

  function _complete (result) {

    // eslint-disable-next-line no-console
    if (LOGGING) console.log('_getNewLink COMPLETE', result)

    _this._newSubscritpionLinkSubscription = null
  }
}

/**
 * @private
 * @param {string} token
 * @returns {rxjs.Observable<TBasAppSyncSocketResult>}
 */
BasAppSyncClient.prototype._createAppSyncSocket = function (token) {

  return createAppSyncSocket(
    this._appSyncHost,
    this._wsLink,
    token
  ).pipe(
    operators.catchError(_convertErrorToNull),
    operators.filter(_filterValid),

    // Do not use the operator "first" here
    // When an error occurs it gets transformed to an observable of "null"
    // This type of observable does not work with the "first" operator
    operators.take(1)
  )
}

/**
 * @private
 * @param {string} id
 * @returns {?TBasAppSyncSubscription}
 */
BasAppSyncClient.prototype._getSubscriptionWithId = function (id) {

  var length, i, subscription

  length = this._subscribers.length
  for (i = 0; i < length; i++) {

    subscription = this._subscribers[i]

    if (subscription && subscription.id === id) {

      return subscription
    }
  }

  return null
}

/**
 * @private
 * @param {string} id
 * @returns {?TBasAppSyncSubscription}
 */
BasAppSyncClient.prototype._takeSubscriptionWithId = function (id) {

  var length, i, subscription

  length = this._subscribers.length
  for (i = 0; i < length; i++) {

    subscription = this._subscribers[i]

    if (subscription && subscription.id === id) break
  }

  if (subscription && subscription.id === id) {

    this._subscribers.splice(i, 1)

    return subscription
  }

  return null
}

/**
 * @private
 * @param {string} id
 * @param {*} next
 */
BasAppSyncClient.prototype._nextForSubscriptionWithId = function (
  id,
  next
) {

  var subscription

  subscription = this._getSubscriptionWithId(id)

  if (subscription && subscription.subscriber) {

    subscription.subscriber.next(next)
  }
}

/**
 * @private
 * @param {string} id
 * @param {*} [complete]
 */
BasAppSyncClient.prototype._completeForSubscriptionWithId = function (
  id,
  complete
) {

  var subscription

  subscription = this._takeSubscriptionWithId(id)

  if (subscription && subscription.subscriber) {

    subscription.subscriber.complete(complete)
  }
}

/**
 * @private
 * @param {string} id
 * @param {*} error
 */
BasAppSyncClient.prototype._errorForSubscriptionWithId = function (
  id,
  error
) {

  var subscription

  subscription = this._takeSubscriptionWithId(id)

  if (subscription && subscription.subscriber) {

    subscription.subscriber.error(error)
  }
}

/**
 * @private
 * @param {string[]} ids
 * @returns {TBasAppSyncSubscription[]}
 */
BasAppSyncClient.prototype._spliceSubscriptionsWithIds = function (ids) {

  var result, currentSubscriptions, length, i, subscription

  result = []
  currentSubscriptions = this._subscribers
  this._subscribers = []

  length = currentSubscriptions.length
  for (i = 0; i < length; i++) {

    subscription = currentSubscriptions[i]

    if (subscription && BasUtil.isNEString(subscription.id)) {

      if (ids.indexOf(subscription.id) > -1) {

        // Add subscription to "spliced/removed" collection

        result.push(subscription)

      } else {

        // Keep subscription

        this._subscribers.push(subscription)
      }
    }
  }

  return result
}

/**
 * @private
 * @param {string[]} ids
 * @param {*} error
 */
BasAppSyncClient.prototype._clearSubscriptionsWithIds = function (
  ids,
  error
) {

  var toBeCleared, length, i, subscription

  toBeCleared = this._spliceSubscriptionsWithIds(ids)

  // Finish "toBeCleared" subscriptions with error

  length = toBeCleared.length
  for (i = 0; i < length; i++) {

    subscription = toBeCleared[i]

    if (subscription.subscriber) {

      subscription.subscriber.error(error)
    }
  }
}

/**
 * @constructor
 * @param {BasAppSyncClient} client
 * @param {WebSocket} webSocket
 * @param {Object} initialPayload
 */
function BasAppSyncSocket (
  client,
  webSocket,
  initialPayload
) {
  /**
   * @type {BasAppSyncClient}
   */
  this._client = client

  /**
   * @type {WebSocket}
   */
  this._ws = webSocket

  /**
   * @type {number}
   */
  this._connectionTimeoutMs = 0

  this._connectionTimeoutId = 0

  /**
   * @private
   * @type {TBasAppSyncSocketRequest[]}
   */
  this._startRequests = []

  /**
   * @private
   * @type {TBasAppSyncSocketRequest[]}
   */
  this._stopRequests = []

  /**
   * @type {string[]}
   */
  this.subscriptionIds = []

  this._wsListeners = []

  this._handleWsError = this._onWsError.bind(this)
  this._handleWsClose = this._onWsClose.bind(this)
  this._handleWsMessage = this._onWsMessage.bind(this)

  this._handleConnectionTimeout = this._onConnectionTimeout.bind(this)

  this._parseInitialPayload(initialPayload)
  this._setWebSocketListeners()
  this._init()
}

/**
 * @private
 * @param {Object[]} requests
 * @param {string} id
 * @returns {?Object}
 */
BasAppSyncSocket._takeRequestWithId = function (requests, id) {

  var length, i, request, idx

  idx = -1

  length = requests.length
  for (i = 0; i < length; i++) {

    request = requests[i]

    if (request.id === id) {

      idx = i
      break
    }
  }

  if (idx > -1) {

    requests.splice(idx, 1)

    return request
  }

  return null
}

/**
 * @private
 * @param {Object[]} requests
 * @param {*} [reason]
 */
BasAppSyncSocket._rejectAllRequests = function (requests, reason) {

  var length, i, request

  length = requests.length
  for (i = 0; i < length; i++) {

    request = requests[i]

    if (request && !request.finished) {

      request.finished = true
      request.reject(reason)
    }
  }
}

BasAppSyncSocket._onSubscriptionError = function (error) {

  var errors, length, i, _error

  if (error && Array.isArray(error[_K_ERRORS])) {

    errors = error[_K_ERRORS]
    length = errors.length
    for (i = 0; i < length; i++) {

      _error = errors[i]

      if (_error && _error[_K_ERROR_TYPE] === _E_WAF_FORBIDDEN) {

        // WAF is stopping request, stop other requests

        return Promise.reject(new BasError(
          BasAppSyncClient.ERR_WAF,
          undefined,
          'WAF exception'
        ))
      }
    }
  }

  return Promise.reject(error)
}

BasAppSyncSocket.prototype._init = function () {

  this._resetConnectionTimeout()
}

BasAppSyncSocket.prototype._parseInitialPayload = function (initialPayload) {

  var value

  if (BasUtil.isObject(initialPayload)) {

    value = initialPayload['connectionTimeoutMs']

    if (BasUtil.isPNumber(value)) {

      this._connectionTimeoutMs = value
    }
  }
}

/**
 * @param {string} config
 * @param {string} token
 * @param {number} [timeout]
 * @returns {Promise<string>}
 */
BasAppSyncSocket.prototype.startSubscription = function (
  config,
  token,
  timeout
) {

  var _this

  _this = this

  return this.isOpen()
    ? new Promise(_startSubscriptionPromiseConstructor)
      .catch(BasAppSyncSocket._onSubscriptionError)
    : Promise.reject('WebSocket is not open')

  function _startSubscriptionPromiseConstructor (resolve, reject) {

    var _id, _msg

    _id = BasUtil.uuidv4()

    _msg = {
      id: _id,
      payload: {
        data: config,
        extensions: {
          authorization: {
            Authorization: token,
            host: _this._client._appSyncHost
          }
        }
      },
      type: _AS_START
    }

    try {

      _this._ws.send(JSON.stringify(_msg))

    } catch (error) {

      return reject(error)
    }

    _this._startRequests.push({
      finished: false,
      id: _id,
      resolve: resolve,
      reject: reject,
      timeoutId: BasUtil.isPNumber(timeout)
        ? setTimeout(_onTimeout, timeout)
        : 0
    })

    function _onTimeout () {

      var request

      request = BasAppSyncSocket._takeRequestWithId(
        _this._startRequests,
        _id
      )

      if (request && !request.finished) {

        request.finished = true
        request.reject(new BasError(
          'Timeout',
          undefined,
          'Timeout after ' + timeout + 'ms'
        ))
      }
    }
  }
}

/**
 * @param {string} id
 * @returns {Promise}
 */
BasAppSyncSocket.prototype.stopSubscription = function (id) {

  var _this

  _this = this

  return this.isOpen()
    ? new Promise(_stopSubscriptionPromiseConstructor)
      .catch(BasAppSyncSocket._onSubscriptionError)
    : Promise.reject('WebSocket is not open')

  function _stopSubscriptionPromiseConstructor (resolve, reject) {

    var _msg

    _msg = {
      id: id,
      type: _AS_STOP
    }

    try {

      _this._ws.send(JSON.stringify(_msg))

    } catch (error) {

      return reject(error)
    }

    _this._stopRequests.push({
      finished: false,
      id: id,
      resolve: resolve,
      reject: reject
    })
  }
}

/**
 * @returns {boolean}
 */
BasAppSyncSocket.prototype.isOpen = function () {

  return (
    this._ws &&
    this._ws.readyState === WebSocket.OPEN
  )
}

BasAppSyncSocket.prototype._setWebSocketListeners = function () {

  this._clearWebSocketListeners()

  if (this._ws) {

    this._wsListeners.push(BasUtil.setDOMListener(
      this._ws,
      'error',
      this._handleWsError
    ))
    this._wsListeners.push(BasUtil.setDOMListener(
      this._ws,
      'close',
      this._handleWsClose
    ))
    this._wsListeners.push(BasUtil.setDOMListener(
      this._ws,
      'message',
      this._handleWsMessage
    ))
  }
}

BasAppSyncSocket.prototype._clearWebSocketListeners = function () {

  BasUtil.executeArray(this._wsListeners)
  this._wsListeners = []
}

/**
 * @private
 * @param {Event} event
 */
BasAppSyncSocket.prototype._onWsError = function (event) {

  // eslint-disable-next-line no-console
  if (LOGGING) console.log('APP_SYNC WS ERROR', event)

  if (this._client) {

    this._client._clearSubscriptionsWithIds(
      this.subscriptionIds,
      event
    )
  }
}

/**
 * @private
 * @param {CloseEvent} event
 */
BasAppSyncSocket.prototype._onWsClose = function (event) {

  // eslint-disable-next-line no-console
  if (LOGGING) console.log('APP_SYNC WS CLOSE', event)

  if (this._client) {

    this._client._clearSubscriptionsWithIds(
      this.subscriptionIds,
      event
    )
  }
}

/**
 * @private
 * @param {MessageEvent} event
 */
BasAppSyncSocket.prototype._onWsMessage = function (event) {

  var msg, request, idx

  try {

    msg = JSON.parse(event.data)

  } catch (parseError) {

    // eslint-disable-next-line no-console
    if (LOGGING) console.log('APP_SYNC INVALID WS MESSAGE', event, parseError)

    return
  }

  if (msg) {

    switch (msg.type) {
      case _AS_KA:

        this._resetConnectionTimeout()

        break
      case _AS_START_ACK:

        this.subscriptionIds.push(msg.id)

        request = BasAppSyncSocket._takeRequestWithId(
          this._startRequests,
          msg.id
        )

        if (request && !request.finished) {

          clearTimeout(request.timeoutId)
          request.finished = true
          request.resolve(msg.id)
        }

        // TODO Notify client?

        break
      case _AS_DATA:

        if (this._client) {

          this._client._nextForSubscriptionWithId(
            msg.id,
            msg.payload
          )
        }

        break
      case _AS_COMPLETE:

        idx = this.subscriptionIds.indexOf(msg.id)

        if (idx > -1) {

          this.subscriptionIds.splice(idx, 1)
        }

        if (this._client) {

          this._client._completeForSubscriptionWithId(msg.id)
        }

        request = BasAppSyncSocket._takeRequestWithId(
          this._stopRequests,
          msg.id
        )

        if (request && !request.finished) {

          clearTimeout(request.timeoutId)
          request.finished = true
          request.resolve(msg.id)
        }

        break
      case _AS_ERROR:

        if ('id' in msg) {

          idx = this.subscriptionIds.indexOf(msg.id)

          if (idx > -1) {

            this.subscriptionIds.splice(idx, 1)
          }

          if (this._client) {

            this._client._errorForSubscriptionWithId(
              msg.id,
              msg.payload
            )
          }

          // Start requests

          request = BasAppSyncSocket._takeRequestWithId(
            this._startRequests,
            msg.id
          )

          if (request && !request.finished) {

            clearTimeout(request.timeoutId)
            request.finished = true
            request.reject(msg.payload)
          }

          // Stop requests

          request = BasAppSyncSocket._takeRequestWithId(
            this._stopRequests,
            msg.id
          )

          if (request && !request.finished) {

            clearTimeout(request.timeoutId)
            request.finished = true
            request.reject(msg.payload)
          }

        } else {

          // TODO Notify client?

          // eslint-disable-next-line no-console
          if (LOGGING) console.error('AppSync WS error', msg)
        }

        break
    }
  }
}

BasAppSyncSocket.prototype._resetConnectionTimeout = function () {

  this._clearConnectionTimeout()

  if (this._connectionTimeoutMs > 0) {

    this._connectionTimeoutId = setTimeout(
      this._handleConnectionTimeout,
      this._connectionTimeoutMs
    )
  }
}

BasAppSyncSocket.prototype._onConnectionTimeout = function () {

  // eslint-disable-next-line no-console
  if (LOGGING) console.log('CONNECTION TIMEOUT')

  this._clearWebSocket()

  this._rejectAllRequests()

  this._clearAllSubscriptions('Connection timed out')
}

BasAppSyncSocket.prototype._clearConnectionTimeout = function () {

  clearTimeout(this._connectionTimeoutId)
  this._connectionTimeoutId = 0
}

/**
 * @private
 * @param {*} error
 */
BasAppSyncSocket.prototype._clearAllSubscriptions = function (error) {

  if (this._client) {

    this._client._clearSubscriptionsWithIds(
      this.subscriptionIds,
      error
    )
  }
}

BasAppSyncSocket.prototype._clearWebSocket = function () {

  this._clearConnectionTimeout()
  this._clearWebSocketListeners()

  if (this._ws) {

    try {

      this._ws.close()

    } catch (error) {

      // eslint-disable-next-line no-console
      if (LOGGING) console.log('WS close error', error)
    }
  }

  this._ws = undefined
}

BasAppSyncSocket.prototype._rejectAllRequests = function () {

  var requests

  requests = this._startRequests
  this._startRequests = []
  BasAppSyncSocket._rejectAllRequests(requests)

  requests = this._stopRequests
  this._stopRequests = []
  BasAppSyncSocket._rejectAllRequests(requests)
}

BasAppSyncSocket.prototype.destroy = function () {

  this._clearWebSocket()

  this._rejectAllRequests()

  this._clearAllSubscriptions('Subscription cancelled')

  this._client = undefined
}

/**
 * @param appSyncHost
 * @param wsUrl
 * @param token
 * @returns {rxjs.Observable<TBasAppSyncSocketResult>}
 */
function createAppSyncSocket (appSyncHost, wsUrl, token) {

  var tag, _id

  _id = _getId()

  tag = 'createAppSyncSocket ' + _id

  return new rxjs.Observable(_createAppSyncSocket)

  function _createAppSyncSocket (subscriber) {

    var _finished, _timeoutId
    var _wsUrl, _ws, _wsListeners

    _finished = false

    // eslint-disable-next-line no-console
    if (LOGGING) console.log(tag)

    if (appSyncHost && wsUrl && token) {

      _wsUrl = wsUrl + '?header=' + btoa(JSON.stringify({
        Authorization: token,
        host: appSyncHost
      })) + _WS_PATH_PAYLOAD_PARAM

      _timeoutId = setTimeout(_onTimeout, _WS_TIMEOUT)

      try {

        _ws = new WebSocket(_wsUrl, _WS_PROTOCOL)

      } catch (error) {

        _finished = true

        _cleanup()

        // eslint-disable-next-line no-console
        if (LOGGING) console.log(tag + ' error try catch WS', error)

        subscriber.error(new BasError(
          'WebSocket Error',
          error,
          'Error creating WebSocket'
        ))

        return
      }

      _setWsListener()

      return unsubscribe

    } else {

      _finished = true

      // eslint-disable-next-line no-console
      if (LOGGING) console.log(tag + ' error invalid input')

      subscriber.error('Invalid input')
    }

    /**
     * @private
     * @param {Event} event
     */
    function _onWsError (event) {

      if (_finished) return

      _finished = true

      _cleanup()

      // eslint-disable-next-line no-console
      if (LOGGING) console.log(tag + ' error WS error', event)

      subscriber.error(new BasError(
        'WebSocket Error',
        event,
        'WebSocket Error'
      ))
    }

    /**
     * @private
     * @param {CloseEvent} event
     */
    function _onWsClose (event) {

      if (_finished) return

      _finished = true

      _cleanup()

      // eslint-disable-next-line no-console
      if (LOGGING) console.log(tag + ' error WS close')

      subscriber.error(new BasError(
        'WebSocket Error',
        event,
        'WebSocket closed'
      ))
    }

    function _onWsOpen () {

      if (_finished) return

      if (_ws) {

        try {

          _ws.send(_WS_CONNECTION_INIT)

        } catch (error) {

          _finished = true

          _cleanup()

          // eslint-disable-next-line no-console
          if (LOGGING) console.log(tag + ' error WS send on open')

          subscriber.error(new BasError(
            'WebSocket Error',
            error,
            'WebSocket send error'
          ))
        }
      }
    }

    /**
     * @private
     * @param {MessageEvent} event
     */
    function _onWsMessage (event) {

      var msg, _webSocket

      if (_finished) return

      try {

        msg = JSON.parse(event.data)

      } catch (error) {

        _finished = true

        _cleanup()

        // eslint-disable-next-line no-console
        if (LOGGING) console.log(tag + ' error WS parse msg')

        subscriber.error(new BasError(
          'WebSocket Error',
          {
            event: event,
            error: error
          },
          'WebSocket receive message invalid'
        ))

        return
      }

      if (BasUtil.isObject(msg)) {

        if (msg.type === _AS_CONNECTION_ACK) {

          _finished = true

          _clearWsListeners()
          _clearTimeout()

          _webSocket = _ws
          _ws = undefined

          // eslint-disable-next-line no-console
          if (LOGGING) console.log(tag + ' NEXT ws created')

          subscriber.next({
            socket: _webSocket,
            initialPayload: msg.payload
          })
          subscriber.complete()

          _cleanup()

        } else {

          // Unknown message
        }

      } else {

        // Unknown message
      }
    }

    function _onTimeout () {

      if (_finished) return

      _finished = true

      // eslint-disable-next-line no-console
      if (LOGGING) console.log(tag + ' error TIMEOUT')

      subscriber.error(new BasError(
        'Timeout',
        undefined,
        'Timeout reached'
      ))

      _cleanup()
    }

    function _setWsListener () {

      _clearWsListeners()

      if (_ws) {

        _wsListeners = []

        _wsListeners.push(BasUtil.setDOMListener(
          _ws,
          'error',
          _onWsError
        ))
        _wsListeners.push(BasUtil.setDOMListener(
          _ws,
          'close',
          _onWsClose
        ))
        _wsListeners.push(BasUtil.setDOMListener(
          _ws,
          'open',
          _onWsOpen
        ))
        _wsListeners.push(BasUtil.setDOMListener(
          _ws,
          'message',
          _onWsMessage
        ))
      }
    }

    function _clearWsListeners () {

      BasUtil.executeArray(_wsListeners)
      _wsListeners = undefined
    }

    function _destroyWs () {

      _clearWsListeners()

      if (_ws) {

        try {

          _ws.close()

        } catch (error) {

          // Ignore
        }
      }

      _ws = undefined
    }

    function _clearTimeout () {

      clearTimeout(_timeoutId)
      _timeoutId = 0
    }

    function _cleanup () {

      _clearTimeout()
      _destroyWs()
    }

    function unsubscribe () {

      _cleanup()
    }
  }
}

/**
 * @private
 * @returns {rxjs.Observable<null>}
 */
function _convertErrorToNull () {

  return rxjs.of(null)
}

function _filterValid (value) {

  return !!value
}

export default BasAppSyncClient
