const {v4: uuidv4} = require('uuid');
const {MQTTAccessPoint, API_VERSION, QOS} = require('./iot_engine');
const {partial, generate_mqtt_suffix, sleep, stat_obj} = require('./util');
const ACK_TIMEOUT = 5;
const PUBLISH_INT_SEC = 90;

var requests = {};
var to_ack = {};

const _ack_callback = async (mqtt_ap_obj, topic, payload, dup, qos, retain) => {
  const decoder = new TextDecoder('utf8');
  const json = decoder.decode(payload);
  let message;
  const now_ts = Date.now();
  try {
    message = JSON.parse(json);
  } catch (error) {
    console.log(
      `[ERROR] Parsing ACK message as JSON...\nmsg:${json}\nerror: ${error}`
    );
  }
  if (!(message.rid in requests)) {
    console.log(
      'Received ACK for a message that is not in requests list. Ignoring ack'
    );
    return;
  } else {
    requests[message.rid].setAckedTime = now_ts - requests[message.rid].rTime;
    if (!requests[message.rid].acked) {
      mqtt_ap_obj.publish_stat({
        ...requests[message.rid].stat_metadata(),
        ...stat_obj(
          'acked_time',
          'Elapsed round trip from submitting request to receiving ack.',
          'second',
          'msc_instance',
          requests[message.rid].ackedTime / 1000.0,
          mqtt_ap_obj.connection.config.client_id
        ),
      });
    }
    requests[message.rid].setAcked = true;
    console.log(`ACKED ${message.rid}`);
    delete to_ack[message.rid];
  }
};

const _on_resp_publish = async (
  mqtt_ap_obj,
  robj,
  user_callback,
  request_type,
  topic,
  payload,
  dup,
  qos,
  retain
) => {
  const decoder = new TextDecoder('utf8');
  robj.setRespTime = Date.now() - robj.rTime;
  let message;
  try {
    message = JSON.parse(decoder.decode(payload));
  } catch (error) {
    console.log(
      `[ERROR] Parsing message as JSON...\nmsg:${payload}\nerror: ${error}`
    );
    return;
  }

  if (user_callback) {
    if (request_type === 'ros_service') {
      await mqtt_ap_obj.publish_stat({
        ...robj.stat_metadata(),
        ...stat_obj(
          'resp_time',
          'Elapsed round trip from submitting request to receiving response (subscribe, service, param_get).',
          'second',
          'msc_instance',
          robj.respTime / 1000.0,
          mqtt_ap_obj.connection.config.client_id
        ),
      });
      await user_callback(message);
      mqtt_ap_obj.unsubscribe_mqtt(
        [
          mqtt_ap_obj.mqtt_topic_prefix,
          robj.request_type,
          generate_mqtt_suffix(robj.ros_topic),
        ].join('/')
      );
    } else {
      await user_callback(message.data);
      // unsubscribe from subscriptions that are related to Actions!
      if (
        robj.ros_topic === '/robot_arm/control_agitator/result' ||
        robj.ros_topic === '/robot_arm/control_gripper/result'
      ) {
        mqtt_ap_obj.unsubscribe_mqtt(
          [
            mqtt_ap_obj.mqtt_topic_prefix,
            robj.request_type,
            generate_mqtt_suffix(robj.ros_topic),
          ].join('/')
        );
      }
    }
  }
};

class ThingMQTTAccessPoint extends MQTTAccessPoint {
  // class constructor
  constructor(
    mqtt_connection,
    thing_name,
    product,
    user_id,
    build_mqtt_connection_time
  ) {
    super(mqtt_connection);

    this.mqtt_topic_prefix = `cmd/${thing_name}/${product}/${API_VERSION}`;
    this.stat_topic = this.mqtt_topic_prefix + '/stats';
    this.startup_ts = Date.now();
    this.user = user_id;
    this.build_conn_time = build_mqtt_connection_time;
    this.publish_stat({
      rTime: this.startup_ts,
      stat: 'build_conn_time',
      value: build_mqtt_connection_time,
      description: 'Elapsed time for building MQTT connection',
      unit: 's',
      producer: 'msc_instance',
      user: user_id,
      client_id: mqtt_connection.config.client_id,
    });
  }

  async publish_stat(obj) {
    console.log(`Publishing stat: ${obj.stat}`);
    this.connection.publish(this.stat_topic, obj, QOS);
  }
  async subscribe_to_ack() {
    const ack_callback = partial(_ack_callback, this);
    let ack_topic = [this.mqtt_topic_prefix, '+', 'ack'].join('/');
    await this._subscribe_to_mqtt(ack_topic, ack_callback);
  }

  async wait_for_acks() {
    return new Promise(async (resolve, reject) => {
      let now = Date.now();
      while (
        Object.keys(to_ack).length !== 0 &&
        (Date.now() - now) / 1000 < ACK_TIMEOUT
      ) {
        console.log('Waiting for acks...');
        await sleep(100);
      }
      if (Object.keys(to_ack).length !== 0) {
        reject();
      } else {
        this.publish_stat({
          rTime: Date.now(),
          stat: 'startup_time',
          value: this.build_conn_time + (Date.now() - this.startup_ts) / 1000,
          description: 'Elapsed time establishing the connection to site',
          unit: 's',
          producer: 'msc_instance',
          user: this.user,
          client_id: this.connection.config.client_id,
        });
        resolve();
      }
    });
  }

  async clearIntervals() {
    for (const rid in requests) {
      const cur = requests[rid];
      clearInterval(cur.int_id);
    }
    for (const topic in this.subscriptions) {
      this._unsubscribe_to_mqtt(topic).catch((error) => {
        console.log(`Unsubscribing ${topic} failed: ${error}`);
      });
    }
    requests = {};
    to_ack = {};
    this.subscriptions = {};
  }

  // publish the ROS request on MQTT topic
  async submit_request(robj, user_callback, ack_required = false) {
    return new Promise(async (resolve, reject) => {
      robj.setRid = uuidv4();
      robj.setAcked = false;
      const json = JSON.stringify(robj.getJson());
      let topic = this.mqtt_topic_prefix + '/' + robj.request_type;
      let published = false;
      let subscribed = false;
      // add request
      requests[robj.rid] = robj;
      to_ack[robj.rid] = robj;
      console.log(`INFO: Submitting (publishing) ${json} on ${topic}`);
      try {
        robj.setRTime = Date.now();
        if (user_callback) {
          const resp_callback = partial(
            _on_resp_publish,
            this,
            robj,
            user_callback,
            robj.request_type
          );
          await this._subscribe_to_mqtt(
            topic + '/' + generate_mqtt_suffix(robj.ros_topic),
            resp_callback
          )
            .then(() => {
              subscribed = true;
            })
            .catch((error) => {
              reject(' > submit_request: ' + error);
            });
        }

        // let the request for subscribe publish frequently
        await this._publish_to_mqtt(topic, json)
          .then(() => {
            published = true;
          })
          .catch((error) => {
            reject(' > submit_request: ' + error);
          });

        if (robj.request_type === 'ros_subscribe') {
          robj.setIntId = setInterval(async () => {
            while (document.hidden) {
              await sleep(5000);
            }
            robj.setAcked = false;
            to_ack[robj.rid] = robj;
            robj.setRTime = Date.now();
            this._publish_to_mqtt(topic, json)
              .then(() => {
                console.log('INFO: Interval Published!');
              })
              .catch((error) => {
                reject(' > submit_request (interval): ' + error);
              });
          }, PUBLISH_INT_SEC * 1000);
        }

        if (ack_required) {
          let now = Date.now();
          while (!robj.acked && (Date.now() - now) / 1000 < ACK_TIMEOUT) {
            console.log('INFO: Waiting for ack...');
            await sleep(10);
          }
          if (!robj.acked) {
            reject(' > submit_request (ack timeout)');
          }
        }
        if (
          (!user_callback && !published) ||
          (user_callback && (!published || !subscribed))
        ) {
          reject(
            '> Publish' + (user_callback ? ' & subscribe' : '') + ' failed!'
          );
        } else {
          resolve();
        }
      } catch (error) {
        reject(error);
      }
    });
  }
}

export {ThingMQTTAccessPoint};
