import { createAction } from 'redux-act';
import { eventChannel } from 'redux-saga';
import { put, call, takeEvery, select, all, take } from 'redux-saga/effects';
import { v4 as uuidv4 } from 'uuid';
import { selectCells, markCellAsExecuting, addCell } from './notebook.module';
import { notebookUser } from '../selectors/notebookUser.selector';
import {
  getSource,
  isCellExecutable,
} from '../../../components/workbench/fileTypes/notebook/component/notebook-cells/NotebookCellManager';
import {
  addOutputToInputVariable,
  addOutputToOutputElement,
  clearOutputOfInputOutputCell,
  clearOutputOfInputVariables,
} from './cells.app.module';
import { markCellAsExecutingApp } from './app.module';
import { selectCell, updateNotebookCells } from './utils/notebooks';
import { warning as warningNotification } from 'react-notification-system-redux';
import { kernelNotReady } from '../notifications/notifications';
import * as Api from '../../../core/api';
import { getExecutionCode } from '../../../components/workbench/fileTypes/notebook/component/notebook-cells/cell-implementations/app-cells/python3-input-cell/variables/variableOptions';

export const openSocket = createAction(
  'open socket',
  (sessionId, kernelId, parentType = 'notebook', serverName = '') => ({
    sessionId,
    kernelId,
    parentType,
    serverName,
  })
);

export const executeCells = createAction(
  'execute cells',
  (path, sessionId, cellIds) => ({ path, sessionId, cellIds })
);

export const cleanupEmptyCell = createAction(
  'cleanup empty cell',
  (path, cellId) => ({ path, cellId })
);

export const executeInputResponse = createAction(
  'execute input',
  (path, cellId, executionCount, parentType) => ({
    path,
    cellId,
    executionCount,
    parentType,
  })
);

export const addOutput = createAction(
  'add output',
  (path, cellId, executionCount, output, parentType) => ({
    path,
    cellId,
    executionCount,
    output,
    parentType,
  })
);

export const clearOutput = createAction('add output', (path, cellId) => ({
  path,
  cellId,
}));

export const executeReplyResponseOk = createAction(
  'execute reply response ok',
  (path, cellId, executionCount, parentType) => ({
    path,
    cellId,
    executionCount,
    parentType,
  })
);

export const executeReplyResponseError = createAction(
  'execute reply response error',
  (path, cellId, executionCount, parentType) => ({
    path,
    cellId,
    executionCount,
    parentType,
  })
);

export const updateKernelState = createAction(
  'update kernel state',
  (sessionId, executionState) => ({ sessionId, executionState })
);

export const requestCodeCompletion = createAction(
  'request code completion',
  (path, cellId, sessionId, currentRowSource, column, row) => ({
    path,
    cellId,
    sessionId,
    currentRowSource,
    column,
    row,
  })
);

export const codeCompletionReply = createAction(
  'code completion reply',
  (path, cellId, currentRowSource, column, row, content) => ({
    path,
    cellId,
    currentRowSource,
    column,
    row,
    content,
  })
);

export const clearCodeCompletion = createAction(
  'clear code completion',
  (path, cellId) => ({ path, cellId })
);

const CELL_TYPES = {
  CREDENTIALS: 'credentials',
  INPUT: 'python3-input',
  INPUT_VARIABLE: 'python3-input-variable', // This is used for the "variables" part of an INPUT cell
  OUTPUT: 'python3-output',
};

/**
 * Join two strings (from a stream output) and clean the unicode characters
 * @param a
 * @param b
 * @returns {*}
 */
const safeJoinAndClean = (a, b) => {
  const joined = (a ? a : '') + (b ? b : '');

  // 1. Remove the \b characters
  const cleaned = joined.replace(/.[\b]/g, '');

  // 2. Remove the \u001b[?25. characters (are those cursor characters?)
  const noCursors = cleaned.replace(/\u001b\[\?25./, '');

  return noCursors;
};

/**
 * If previous output was of type stream and the next output is of type stream too these messages will be merged.
 * The initial reason for doing this is to be able to format the "pip install ..." output - which contains a lot of
 * \b characters.
 * @param currentOutputs
 * @param newOutput
 */
export const mergeOutputs = (currentOutputs, newOutput) => {
  if (!currentOutputs || currentOutputs.length === 0) return [newOutput];
  if (!newOutput) return currentOutputs;

  const latestOutput = currentOutputs[currentOutputs.length - 1];
  if (
    newOutput.output_type === 'stream' &&
    latestOutput.output_type === 'stream' &&
    newOutput.name === 'stdout' &&
    latestOutput.name === 'stdout'
  ) {
    // Latest output and the new output are stream outputs - merge them!
    const mergedOutput = {
      ...latestOutput,
      ...newOutput,
      text: safeJoinAndClean(latestOutput.text, newOutput.text),
    }; // TODO Could other fields be dropped here?
    const allButLatestOutput = currentOutputs.slice(
      0,
      currentOutputs.length - 1
    );
    return [...allButLatestOutput, mergedOutput];
  } else {
    return [...currentOutputs, newOutput];
  }
};

export const reducer = {
  [executeInputResponse](state, { path, cellId, executionCount, parentType }) {
    if (parentType === 'notebook') {
      const updatedFields = {
        execution_count: executionCount,
        outputs: [],
      };
      return updateNotebookCells(state, path, cellId, updatedFields);
    } else if (parentType === 'app') {
      const executionPlanStepIndex = state.app.executionPlan.findIndex(
        (step) => step.cells && step.cells.some((c) => c.id === cellId)
      );
      if (executionPlanStepIndex < 0) return state;
      const executionPlanStep = state.app.executionPlan[executionPlanStepIndex];

      // Only the executionCount is added here. The status 'executing' was already set when submitting the execution
      //  request in the "markCellAsExecutingApp" action. If the status was set here, we would run into race conditions
      //  since the "execution finished" action might show up before the "execute input" action
      const updatedExecutionPlanStep = {
        ...executionPlanStep,
        cells: executionPlanStep.cells.map((c) =>
          c.id === cellId
            ? {
                ...c,
                execution_count: executionCount,
                outputs: [],
              }
            : c
        ),
      };

      return {
        ...state,
        app: {
          ...state.app,
          executionPlan: [
            // A. Update the execution plan
            ...state.app.executionPlan.slice(0, executionPlanStepIndex),
            updatedExecutionPlanStep,
            ...state.app.executionPlan.slice(executionPlanStepIndex + 1),
          ],
          notebook: {
            // B. Update the notebook
            ...state.app.notebook,
            content: {
              ...state.app.notebook.content,
              cells: state.app.notebook.content.cells.map((c) =>
                c.id === cellId
                  ? {
                      ...c,
                      execution_count: executionCount,
                      outputs: [],
                    }
                  : { ...c }
              ),
            },
          },
        },
      };
    } else {
      return state;
    }
  },
  [addOutput](state, { path, cellId, executionCount, output, parentType }) {
    if (parentType === 'notebook') {
      const cell = selectCell(state, path, cellId);
      const updatedFields = {
        execution_count: executionCount,
        outputs: mergeOutputs(cell.outputs, output),
      };
      return updateNotebookCells(state, path, cellId, updatedFields);
    } else if (parentType === 'app') {
      const executionPlanStepIndex = state.app.executionPlan.findIndex(
        (step) => step.cells && step.cells.some((c) => c.id === cellId)
      );
      if (executionPlanStepIndex < 0) return state;
      const executionPlanStep = state.app.executionPlan[executionPlanStepIndex];

      let updatedExecutionPlanStep = {
        ...executionPlanStep,
        // status: 'executing', // If one cell of the execution plan step is marked as now being executed the whole step is in state 'executing'
        cells: executionPlanStep.cells.map((c) =>
          c.id === cellId
            ? {
                ...c,
                execution_count: executionCount,
                outputs: mergeOutputs(c.outputs, output),
              }
            : c
        ),
      };

      // If the output is of type error add it to the dedicated execution errors field
      if (output.output_type === 'error') {
        updatedExecutionPlanStep = {
          ...updatedExecutionPlanStep,
          executionErrors: [
            ...(executionPlanStep.executionErrors || []),
            output,
          ],
        };
      }

      return {
        ...state,
        app: {
          ...state.app,
          executionPlan: [
            // A. Update the execution plan
            ...state.app.executionPlan.slice(0, executionPlanStepIndex),
            updatedExecutionPlanStep,
            ...state.app.executionPlan.slice(executionPlanStepIndex + 1),
          ],
          /*
          notebook: { // B. Update the notebook
            ...state.app.notebook,
            content: {
              ...state.app.notebook.content,
              cells: state.app.notebook.content.cells.map(c => (c.id === cellId ? {
                ...c,
                execution_count: executionCount,
                outputs: mergeOutputs(c.outputs, output),
              } : { ...c })),
            },
          },
           */
        },
      };
    } else {
      // This case mustn't happen. But nothing to do in this case
      return state;
    }
  },
  [clearOutput]: (state, { path, cellId }) => {
    const updatedFields = {
      outputs: [],
    };
    return updateNotebookCells(state, path, cellId, updatedFields);
  },
  [cleanupEmptyCell]: (state, { path, cellId }) => {
    const updatedFields = {
      execution_count: undefined,
      executing: false,
      outputs: [],
    };
    return updateNotebookCells(state, path, cellId, updatedFields);
  },
  [executeReplyResponseOk]: (state, payload) =>
    responseStatusReducer(state, payload, true),
  [executeReplyResponseError]: (state, payload) =>
    responseStatusReducer(state, payload, false),
  [updateKernelState]: (state, { sessionId, executionState }) => {
    const notebook = Object.values(state.notebooks).find(
      (nb) => nb.session && nb.session.id === sessionId
    );
    if (!notebook) return state;
    const updatedNotebook = {
      ...notebook,
      session: {
        ...notebook.session,
        kernel: {
          ...notebook.session.kernel,
          execution_state: executionState,
        },
      },
    };

    return {
      ...state,
      notebooks: {
        ...state.notebooks,
        [notebook.path]: updatedNotebook,
      },
    };
  },
  [codeCompletionReply]: (
    state,
    { path, cellId, currentRowSource, column, row, content }
  ) => {
    const updatedFields = {
      completion: {
        row,
        column,
        currentRowSource,
        content,
      },
    };
    return updateNotebookCells(state, path, cellId, updatedFields);
  },
  [clearCodeCompletion]: (state, { path, cellId }) => {
    const updatedFields = {
      completion: undefined,
    };
    return updateNotebookCells(state, path, cellId, updatedFields);
  },
};

/**
 * Reducer function for the executeReplyResponseOk and executeReplyResponseError actions
 * @param state
 * @param path
 * @param cellId
 * @param executionCount
 * @param parentType
 * @param isSuccessful
 */
function responseStatusReducer(
  state,
  { path, cellId, executionCount, parentType },
  isSuccessful
) {
  if (parentType === 'notebook') {
    const updatedFields = {
      execution_count: executionCount,
      executing: false,
    };
    return updateNotebookCells(
      state,
      path,
      cellId,
      updatedFields,
      !isSuccessful
    );
  } else if (parentType === 'app') {
    const executionPlanStepIndex = state.app.executionPlan.findIndex(
      (step) => step.cells && step.cells.some((c) => c.id === cellId)
    );
    if (executionPlanStepIndex < 0) return state;
    const executionPlanStep = state.app.executionPlan[executionPlanStepIndex];

    const cells = executionPlanStep.cells.map((c) =>
      c.id === cellId
        ? {
            ...c,
            execution_count: executionCount,
            executing: false,
          }
        : c
    );

    const executionPlanStepStatus = () => {
      // Condition 1: If the execution plan step has already status failure - it is failed. No matter whether another
      // cell is marked as successful or failed. If one cell failed - the whole step counts as failed.
      // The only way to get out of the 'failure' status is by re-executing the execution plan step (-> status='executing')
      if (executionPlanStep.status === 'failure') {
        return 'failure';
      }
      // Condition 2: The status just received is an error -> Mark  the executionPlanStep as status='failure'
      if (!isSuccessful) {
        return 'failure';
      }

      // Condition 3: With the status just received, no more executable cells are executing.
      if (
        !cells.some((c) => {
          const type = c.cell_type;
          const executable = isCellExecutable(type);
          return executable && c.executing; // The check for executable is necessary since otherwise Markdown cells (that aren't actually executed) would prevent the execution plan step to turn to 'success'
        })
      ) {
        return 'success';
      }

      // Otherwise: The executionPlanStep hasn't been marked as failed yet,
      //  AND No failure was received for this cell (isSuccessful = true)
      //  AND There are still executing cells
      //  -> So the whole executionPlanStep is still status='executing'
      return 'executing';
    };

    const updatedExecutionPlanStep = {
      ...executionPlanStep,
      status: executionPlanStepStatus(),
      cells,
    };

    return {
      ...state,
      app: {
        ...state.app,
        executionPlan: [
          // A. Update the execution plan
          ...state.app.executionPlan.slice(0, executionPlanStepIndex),
          updatedExecutionPlanStep,
          ...state.app.executionPlan.slice(executionPlanStepIndex + 1),
        ],
        notebook: {
          // B. Update the notebook
          ...state.app.notebook,
          content: {
            ...state.app.notebook.content,
            cells: state.app.notebook.content.cells.map((c) =>
              c.id === cellId
                ? {
                    ...c,
                    execution_count: executionCount,
                    executing: false,
                  }
                : { ...c }
            ),
          },
        },
      },
    };
  } else {
    // This case mustn't happen. But nothing to do in this case
    return state;
  }
}

function getSocketUrl(
  sessionId,
  kernelId,
  jupyterUser,
  parentType,
  serverName
) {
  if (parentType === 'app') {
    return `${location.protocol.includes('https') ? 'wss://' : 'ws://'}${
      location.hostname + (location.port ? ':' + location.port : '')
    }/jupyterapp/user/${jupyterUser}/${serverName}/api/kernels/${kernelId}/channels?session_id=${sessionId}`;
  } else {
    // parentType === 'notebook'
    return `${location.protocol.includes('https') ? 'wss://' : 'ws://'}${
      location.hostname + (location.port ? ':' + location.port : '')
    }/jupyter/user/${jupyterUser}/api/kernels/${kernelId}/channels?session_id=${sessionId}`;
  }
}

// eslint-disable-next-line import/no-mutable-exports
export let openSockets = {};
let sentMessages = {};

/**
 * Sets up a websocket connection for a notebook session - this connection is used to execute cells and receive the
 * resulting status / outputs
 * @param sessionId
 * @param kernelId
 * @param jupyterUser
 * @param parentType notebook | app
 * @param serverName
 * @returns {*}
 */
function initWebsocket(
  sessionId,
  kernelId,
  jupyterUser,
  parentType,
  serverName
) {
  return eventChannel((emitter) => {
    const socketUrl = getSocketUrl(
      sessionId,
      kernelId,
      jupyterUser,
      parentType,
      serverName
    );
    const socket = new WebSocket(socketUrl);
    const socketObj = {};
    socketObj[sessionId] = socket;
    openSockets = Object.assign(openSockets, socketObj);

    // Handle socket messages
    socket.onmessage = (messageEvent) => {
      const message = JSON.parse(messageEvent.data);
      // console.log('Received message', message);
      const messageType = message.header.msg_type;
      const parentMsgId = message.parent_header.msg_id;
      const parentMsg = sentMessages[parentMsgId];
      const parentType = parentMsg?.parentType; // app / notebook

      switch (messageType) {
        case 'status': {
          // This determines the status of the kernel ('busy' or 'idle') -> This is shown in the icon next to the Kernel Info
          // const sessionIdParentMsg = message.parent_header.session;
          const executionState = message.content.execution_state;

          if (executionState === 'restarting') {
            // This means the kernel was restarted due to a failure, for example since the memory limit was reached
            // and the Kernel process was killed
            alert('Kernel had to be restarted');
            return emitter(updateKernelState(sessionId, 'starting')); // Is 'starting' a good kernel_state? Or is 'ready' better?
          } else {
            return emitter(updateKernelState(sessionId, executionState));
          }
        }
        case 'execute_input': {
          // Basically only clears the output of the (code) cell
          if (!parentType) break;
          return emitter(
            executeInputResponse(
              parentMsg.path,
              parentMsg.cellId,
              message.content.execution_count,
              parentType
            )
          );
        }
        case 'execute_reply': {
          if (!parentType) break;
          if (message.content.status === 'ok') {
            return emitter(
              executeReplyResponseOk(
                parentMsg.path,
                parentMsg.cellId,
                message.content.execution_count,
                parentType
              )
            );
          }
          if (message.content.status === 'error') {
            return emitter(
              executeReplyResponseError(
                parentMsg.path,
                parentMsg.cellId,
                message.content.execution_count,
                parentType
              )
            );
          }
          break;
        }
        case 'execute_result': // Fall-through intended
        case 'stream': // Fall-through intended
        case 'display_data': // Fall-through intended
        case 'error': {
          if (!parentType) break;
          const output = message.content;
          output.output_type = messageType;

          // Distinguish between regular and output cells
          if (parentMsg.cellType === CELL_TYPES.OUTPUT) {
            const elementId = parentMsg.elementId;
            return emitter(
              addOutputToOutputElement(
                parentMsg.path,
                parentMsg.cellId,
                elementId,
                output,
                parentType
              )
            );
          } else if (parentMsg.cellType === CELL_TYPES.INPUT_VARIABLE) {
            const variableId = parentMsg.variableId;
            return emitter(
              addOutputToInputVariable(
                parentMsg.path,
                parentMsg.cellId,
                variableId,
                output,
                parentType
              )
            );
          } else {
            // Code Cell or Input Cell ("parentType" distinguishes whether the cell was executed in a notebook or an app)
            return emitter(
              addOutput(
                parentMsg.path,
                parentMsg.cellId,
                message.content.execution_count,
                output,
                parentType
              )
            );
          }
        }
        case 'complete_reply': {
          const completions = message.content;
          return emitter(
            codeCompletionReply(
              parentMsg.path,
              parentMsg.cellId,
              parentMsg.currentRowSource,
              parentMsg.column,
              parentMsg.row,
              completions
            )
          );
        }
        case 'clear_output': {
          return emitter(clearOutput(parentMsg.path, parentMsg.cellId));
        }
        case 'shutdown_reply': {
          const { status, restart } = message.content;
          if (status === 'ok' && !restart) {
            // The kernel was shutdown (probably via a delete session http call by us)
            // Give it some time to handle other messages. For example for currently executing cells we will receive
            // an execute_reply message with message.content.status === 'error' to show that execution was interrupted
            setTimeout(() => {
              socket.close();
              const { [sessionId]: closedSocket, ...stillOpenSockets } =
                openSockets;
              openSockets = stillOpenSockets;
            }, 1000);
            return;
          } else if (status !== 'ok') {
            // Kernel restarts work through the same session and have restart = true, so those are expected
            console.log(
              'Unexpected message content for shutdown_reply websocket message: ',
              message.content
            );
          }
          break;
        }
        default: {
          // console.log(`unknown message type: ${messageType}`);
        }
      }
    };

    return () => {};
  });
}

/**
 * Opens the websocket for notebook code execution and receiving the execution results
 * @param sessionId
 * @param kernelId
 * @param parentType notebook | app
 * @param serverName
 * @returns {Generator<<"TAKE", TakeEffectDescriptor>|<"CALL", CallEffectDescriptor>|<"SELECT", SelectEffectDescriptor>|<"PUT", PutEffectDescriptor<*>>, void, *>}
 */
export function* openSocketSaga({
  payload: { sessionId, kernelId, parentType, serverName },
}) {
  const jupyterUser = yield select((state) => notebookUser(state));
  const channel = yield call(
    initWebsocket,
    sessionId,
    kernelId,
    jupyterUser,
    parentType,
    serverName
  );
  while (true) {
    const action = yield take(channel);
    yield put(action);
  }
}

export function* watchOpenSocket() {
  yield takeEvery(openSocket().type, openSocketSaga);
}

function isSocketReadyToExecute(socket) {
  if (!socket) {
    return false;
  }
  if (
    [WebSocket.CONNECTING, WebSocket.CLOSED, WebSocket.CLOSING].includes(
      socket.readyState
    )
  ) {
    return false;
  } else {
    return true;
  }
}

export function* executeCellsSaga({ payload: { path, sessionId, cellIds } }) {
  const socket = openSockets[sessionId];
  if (isSocketReadyToExecute(socket)) {
    for (const cellId of cellIds) {
      yield call(executeSingleCellById, path, socket, sessionId, cellId);
    }
  } else {
    yield put(warningNotification(kernelNotReady()));
  }
}

export function* watchExecuteCells() {
  yield takeEvery(executeCells.getType(), executeCellsSaga);
}

/**
 * Searches for a cell with the given cellId in the notebook for the given path and then executes this cell
 * @param path
 * @param socket
 * @param sessionId
 * @param cellId
 * @returns {Generator<<"SELECT", SelectEffectDescriptor>, <"CALL", CallEffectDescriptor>, *>}
 */
export function* executeSingleCellById(path, socket, sessionId, cellId) {
  const cell = yield select((state) =>
    state.workbench.notebooks[path].content.cells.find((c) => c.id === cellId)
  );
  const kernel = yield select(
    (state) =>
      state.workbench.notebooks[path].content?.metadata?.kernelspec?.name
  );
  yield call(
    executeSingleCell,
    path,
    socket,
    sessionId,
    cellId,
    cell,
    'notebook',
    kernel
  );
}

/**
 * Saga that executes a single cell
 * @param path
 * @param socket
 * @param sessionId
 * @param cellId
 * @param cell
 * @param parentType notebook | app Does this execution refer to a notebook or an app?
 * @returns {Generator<<"PUT", PutEffectDescriptor<Action<{path: *, cellsToSelect: *}, {}>>>|<"PUT", PutEffectDescriptor<Action<{path: unknown, cellId: unknown}, {}>>>|<"PUT", PutEffectDescriptor<Action<{path: *, index: *}, {}>>>|<"PUT", PutEffectDescriptor<Action<{path: *, executingCell: *}, {}>>>|<"SELECT", SelectEffectDescriptor>, void, ?>}
 */
export function* executeSingleCell(
  path,
  socket,
  sessionId,
  cellId,
  cell,
  parentType,
  kernel
) {
  if (!isSocketReadyToExecute(socket)) {
    yield put(warningNotification(kernelNotReady()));
    return;
  }

  if (!['notebook', 'app'].includes(parentType)) {
    console.log('parentType must be notebook|app, but was: ', parentType);
    return;
  }

  // If it's a code cell: Simply get the source, if it's an app cell: Derive the code from the configuration
  const source = getSource(cell);
  const type = cell.cell_type;
  const executable = isCellExecutable(type);
  const elements = cell.as_elements || [];
  const variables = cell.as_variables || [];

  // Decide for an execution strategy. The default case is to simply execute the source field of the cell
  switch (type) {
    case CELL_TYPES.CREDENTIALS: {
      const credentials =
        cell.as_credentials?.cassandra || cell.as_credentials?.s3;
      const dsType = Object.keys(cell.as_credentials)[0];
      let response, error;
      if (dsType === 's3') {
        ({ response, error } = yield call(
          Api.data.fetchS3Credentials,
          credentials?.dataSourceCode
        ));
        response = {
          username: response.accessKey,
          password: response.secretKey,
        };
      } else if (dsType === 'cassandra') {
        ({ response, error } = yield call(
          Api.data.fetchCassandraCredentials,
          credentials?.dataSourceCode
        ));
      }
      if (response) {
        let source_ = '';
        switch (kernel) {
          case 'ir':
            source_ = `${credentials?.username} = "${response.username}"\n${credentials?.password} = "${response.password}"`;
            break;
          case 'python3':
            source_ = `${credentials?.username} = "${response.username}"\n${credentials?.password} = "${response.password}"`;
            break;
          case 'sparkkernel':
            if (dsType === 's3') {
              source_ = [
                `spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "${response.username}")`,
                `spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "${response.password}")`,
              ].join('\n');
            } else if (dsType === 'cassandra') {
              source_ = [
                `spark.conf.set("spark.cassandra.auth.username", "${response.username}")`,
                `spark.conf.set("spark.cassandra.auth.password", "${response.password}")`,
              ].join('\n');
            }
            break;
          case 'pysparkkernel':
            if (dsType === 's3') {
              source_ = [
                `sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "${response.username}")`,
                `sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key","${response.password}")`,
              ].join('\n');
            } else if (dsType === 'cassandra') {
              source_ = [
                `spark.conf.set("spark.cassandra.auth.username", "${response.username}")`,
                `spark.conf.set("spark.cassandra.auth.password", "${response.password}")`,
              ].join('\n');
            }
            break;
        }
        yield call(
          executeSourceCell,
          path,
          socket,
          sessionId,
          cellId,
          source_,
          executable,
          type,
          parentType
        );
      }
      break;
    }
    case CELL_TYPES.INPUT: {
      // This is an python3 app input cell. Execute the cell as usual.
      // -> DON'T execute the input cell variables. They are executed after hitting the "Next Step" button.
      // yield call(executeInputCellVariables, path, socket, sessionId, cellId, variables, parentType);
      yield call(
        executeSourceCell,
        path,
        socket,
        sessionId,
        cellId,
        source,
        executable,
        type,
        parentType
      );
      break;
    }
    case CELL_TYPES.OUTPUT: {
      // This is an python3 app output cell - for this cell the elements need to be executed one after another
      //   (there is no common "source")
      yield call(
        executeOutputCell,
        path,
        socket,
        sessionId,
        cellId,
        elements,
        type,
        parentType
      );
      break;
    }
    default: {
      // Regular "Code" cell: Simply execute it. Nothing special.
      yield call(
        executeSourceCell,
        path,
        socket,
        sessionId,
        cellId,
        source,
        executable,
        type,
        parentType
      );
    }
  }

  // Now that the execution is triggered, select next cell (only required if the execution takes place in a notebook)
  if (parentType === 'notebook') {
    const cells = yield select(
      (state) => state.workbench.notebooks[path].content.cells
    );
    const index = cells.findIndex((c) => c.id === cellId);
    if (index < cells.length - 1) {
      // There is a next cell -> select it
      const nextId = cells[index + 1].id;
      yield put(selectCells(path, [nextId]));
    } else {
      // There is no next cell and the current cell was not empty -> add a new cell (that will automatically be selected then)
      if (source && source.trim().length > 1) {
        // Without checking whether the source of the cell just executed is empty there would be new cells added
        // permanently to the end of the notebook
        yield put(addCell(path, index + 1));
      }
    }
  }
}

/**
 * Responsible for executing a cell that has a simple "source" (like a Code-Cell)
 * @param path
 * @param socket
 * @param sessionId
 * @param cellId
 * @param source
 * @param executable
 * @param cellType
 * @param parentType app|notebook
 * @returns {Generator<<"PUT", PutEffectDescriptor<Action<{path: unknown, cellId: unknown}, {}>>>|<"PUT", PutEffectDescriptor<Action<{path: *, executingCell: *}, {}>>>, void, *>}
 */
export function* executeSourceCell(
  path,
  socket,
  sessionId,
  cellId,
  source,
  executable,
  cellType,
  parentType
) {
  // Only execute the cell if the source isn't empty and if the cell is really executable
  if (executable && source && source.trim() !== '') {
    // Mark the cell as executing (only if executing in notebook mode - this simply adds the "*" to the flap on the left)
    if (parentType === 'notebook') {
      yield put(markCellAsExecuting(path, cellId));
    } else if (parentType === 'app') {
      yield put(markCellAsExecutingApp(path, cellId));
    }

    // Save the required information to later treat the response
    const msgId = uuidv4();
    const messageObj = {
      [msgId]: {
        path,
        cellId,
        cellType,
        elementId: undefined, // Just to make it explicit - we're executing a cell and not just an element of a cell here.
        parentType,
      },
    };
    sentMessages = Object.assign(sentMessages, messageObj);

    const msg = {
      buffers: [],
      channel: 'shell',
      content: {
        allow_stdin: true,
        code: source,
        silent: false,
        stop_on_error: true,
        store_history: true,
        user_expressions: {},
      },
      header: {
        msg_id: msgId,
        msg_type: 'execute_request',
        session: sessionId,
        username: '',
        version: '5.2',
      },
      metadata: {
        cellId,
        deletedCells: [],
      },
      parent_header: {},
    };

    socket.send(JSON.stringify(msg));
  } else {
    // If the source was empty: Clean the output of the cell and mark it as not executing.
    yield put(cleanupEmptyCell(path, cellId));
  }
}

/**
 * Responsible for executing an output cell of an App
 * @param path
 * @param socket
 * @param sessionId
 * @param cellId
 * @param elements
 * @param cellType
 * @param parentType app|notebook
 * @returns {Generator<*, void, *>}
 */
export function* executeOutputCell(
  path,
  socket,
  sessionId,
  cellId,
  elements,
  cellType,
  parentType
) {
  yield put(clearOutputOfInputOutputCell(path, cellId, parentType));

  yield all(
    elements.map((element) => {
      if (element.data && element.data.source) {
        // Save the required information to later treat the response
        const source = element.data.source;

        const msgId = uuidv4();
        const messageObj = {
          [msgId]: {
            path,
            cellId,
            cellType,
            elementId: element.id,
            parentType,
          },
        };
        sentMessages = Object.assign(sentMessages, messageObj);

        const msg = {
          buffers: [],
          channel: 'shell',
          content: {
            allow_stdin: true,
            code: source,
            silent: false,
            stop_on_error: true,
            store_history: true,
            user_expressions: {},
          },
          header: {
            msg_id: msgId,
            msg_type: 'execute_request',
            session: sessionId,
            username: '',
            version: '5.2',
          },
          metadata: {
            cellId,
            deletedCells: [],
          },
          parent_header: {},
        };

        socket.send(JSON.stringify(msg));
      }
    })
  );
}

/**
 * Executes the variable fields of the input cell variables
 * @returns {Generator<*, void, *>}
 */
export function* executeInputCellVariables(
  path,
  socket,
  sessionId,
  cellId,
  variables,
  parentType
) {
  if (!variables || variables.length === 0) return; // Nothing to do in this case

  yield put(clearOutputOfInputVariables(path, cellId, parentType));

  const cellType = CELL_TYPES.INPUT_VARIABLE;
  yield all(
    variables.map((variable) => {
      if (variable.name && variable.type) {
        // Save the required information to later treat the response
        const source = getExecutionCode(variable);

        const msgId = uuidv4();
        const messageObj = {
          [msgId]: {
            path,
            cellId,
            cellType,
            variableId: variable.id,
            parentType,
          },
        };
        sentMessages = Object.assign(sentMessages, messageObj);

        const msg = {
          buffers: [],
          channel: 'shell',
          content: {
            allow_stdin: true,
            code: source,
            silent: false,
            stop_on_error: true,
            store_history: true,
            user_expressions: {},
          },
          header: {
            msg_id: msgId,
            msg_type: 'execute_request',
            session: sessionId,
            username: '',
            version: '5.2',
          },
          metadata: {
            cellId,
            deletedCells: [],
          },
          parent_header: {},
        };

        socket.send(JSON.stringify(msg));
      }
    })
  );
}

export function* requestCodeCompletionSaga({
  payload: { path, cellId, sessionId, currentRowSource, column, row },
}) {
  yield call(
    sendRequestCodeCompletion,
    path,
    cellId,
    sessionId,
    currentRowSource,
    column,
    row
  );
}

export function* watchRequestCodeCompletion() {
  yield takeEvery(requestCodeCompletion.getType(), requestCodeCompletionSaga);
}

/**
 * Sends a message through the WebSocket channel that requests for code completion
 * @param path
 * @param sessionId
 * @param currentRowSource
 * @param column
 * @param row
 */
export function sendRequestCodeCompletion(
  path,
  cellId,
  sessionId,
  currentRowSource,
  column,
  row
) {
  const socket = openSockets[sessionId];
  if (!socket) return;

  // Save the required information to later treat the response
  const msgId = uuidv4();
  const messageObj = {
    [msgId]: {
      path,
      currentRowSource,
      column,
      row,
      cellId,
    },
  };
  sentMessages = Object.assign(sentMessages, messageObj);

  const msg = {
    buffers: [],
    channel: 'shell',
    content: {
      code: currentRowSource,
      cursor_pos: column,
    },
    header: {
      date: new Date().toISOString(),
      msg_id: msgId,
      msg_type: 'complete_request',
      session: sessionId,
      username: '',
      version: '5.2',
    },
    metadata: {},
    parent_header: {},
  };

  socket.send(JSON.stringify(msg));
}
