Skip to content

Slack Module

The slack module provides functions for Slack notification and alerting integration.

slack

Classes

SlackUtilMeta

Bases: type

Source code in src/mas/devops/slack.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
class SlackUtilMeta(type):
    def __init__(cls, *args, **kwargs):
        # Exposed by the client() property method
        cls._client = None

    @property
    def client(cls) -> WebClient:
        """
        Get or create the Slack WebClient instance.

        Lazily initializes the Slack client using the SLACK_TOKEN environment variable.

        Returns:
            WebClient: The Slack WebClient instance

        Raises:
            Exception: If SLACK_TOKEN environment variable is not set
        """
        if cls._client is not None:
            return cls._client
        else:
            SLACK_TOKEN = os.getenv("SLACK_TOKEN")
            if SLACK_TOKEN is None:
                logger.warning("SLACK_TOKEN is not set")
                raise Exception("SLACK_TOKEN is not set")
            else:
                cls._client = WebClient(token=SLACK_TOKEN)
                return cls._client

    def postMessageBlocks(cls, channelList: str | list[str], messageBlocks: list, threadId: str = None) -> SlackResponse | list[SlackResponse]:
        """
        Post a message with block formatting to one or more Slack channels.

        Parameters:
            channelList (str | list[str]): Single channel ID/name or list of channel IDs/names
            messageBlocks (list): List of Slack block kit elements defining the message structure
            threadId (str, optional): Thread timestamp to post as a reply. Defaults to None.

        Returns:
            SlackResponse | list[SlackResponse]: Single response if one channel, list of responses if multiple channels

        Raises:
            Exception: If message posting fails
        """
        responses: list[SlackResponse] = []

        if isinstance(channelList, str):
            channelList = [channelList]
        for channel in channelList:
            try:
                if threadId is None:
                    logger.debug(f"Posting {len(messageBlocks)} block message to {channel} in Slack")
                    response = cls.client.chat_postMessage(
                        channel=channel,
                        blocks=messageBlocks,
                        text="Summary text unavailable",
                        mrkdwn=True,
                        parse="none",
                        unfurl_links=False,
                        unfurl_media=False,
                        link_names=True,
                        as_user=True,
                    )
                else:
                    logger.debug(f"Posting {len(messageBlocks)} block message to {channel} on thread {threadId} in Slack")
                    response = cls.client.chat_postMessage(
                        channel=channel,
                        thread_ts=threadId,
                        blocks=messageBlocks,
                        text="Summary text unavailable",
                        mrkdwn=True,
                        parse="none",
                        unfurl_links=False,
                        unfurl_media=False,
                        link_names=True,
                        as_user=True,
                    )

                if not response["ok"]:
                    logger.warning(response.data)
                    logger.warning("Failed to call Slack API")
                responses.append(response)
            except Exception as e:
                logger.error(f"Fail to send a message to {channel}: {e}")
                raise

        return responses if len(responses) > 1 else responses[0]

    def postMessageText(
        cls,
        channelList: str | list[str],
        message: str,
        attachments=None,
        threadId: str = None,
    ) -> SlackResponse | list[SlackResponse]:
        """
        Post a plain text message to one or more Slack channels.

        Parameters:
            channelList (str | list[str]): Single channel ID/name or list of channel IDs/names
            message (str): The text message to post
            attachments (list, optional): List of message attachments. Defaults to None.
            threadId (str, optional): Thread timestamp to post as a reply. Defaults to None.

        Returns:
            SlackResponse | list[SlackResponse]: Single response if one channel, list of responses if multiple channels

        Raises:
            Exception: If message posting fails
        """
        responses: list[SlackResponse] = []

        if isinstance(channelList, str):
            channelList = [channelList]

        for channel in channelList:
            if threadId is None:
                logger.debug(f"Posting message to {channel} in Slack")
                response = cls.client.chat_postMessage(
                    channel=channel,
                    text=message,
                    attachments=attachments,
                    mrkdwn=True,
                    parse="none",
                    unfurl_links=False,
                    unfurl_media=False,
                    link_names=True,
                    as_user=True,
                )
            else:
                logger.debug(f"Posting message to {channel} on thread {threadId} in Slack")
                response = cls.client.chat_postMessage(
                    channel=channel,
                    thread_ts=threadId,
                    text=message,
                    attachments=attachments,
                    mrkdwn=True,
                    parse="none",
                    unfurl_links=False,
                    unfurl_media=False,
                    link_names=True,
                    as_user=True,
                )

            if not response["ok"]:
                logger.warning(response.data)
                logger.warning("Failed to call Slack API")
            responses.append(response)

        return responses if len(responses) > 1 else responses[0]

    def createMessagePermalink(
        cls,
        slackResponse: SlackResponse = None,
        channelId: str = None,
        messageTimestamp: str = None,
        domain: str = "ibm-mas",
    ) -> str:
        """
        Create a permanent link to a Slack message.

        Parameters:
            slackResponse (SlackResponse, optional): Slack response object containing channel and timestamp. Defaults to None.
            channelId (str, optional): Channel ID if not using slackResponse. Defaults to None.
            messageTimestamp (str, optional): Message timestamp if not using slackResponse. Defaults to None.
            domain (str, optional): Slack workspace domain. Defaults to "ibm-mas".

        Returns:
            str: Permanent URL to the Slack message

        Raises:
            Exception: If neither slackResponse nor both channelId and messageTimestamp are provided
        """
        if slackResponse is not None:
            channelId = slackResponse["channel"]
            messageTimestamp = slackResponse["ts"]
        elif channelId is None or messageTimestamp is None:
            raise Exception("Either channelId and messageTimestamp, or slackReponse params must be provided")

        return f"https://{domain}.slack.com/archives/{channelId}/p{messageTimestamp.replace('.', '')}"

    def updateMessageBlocks(cls, channelName: str, threadId: str, messageBlocks: list) -> SlackResponse:
        """
        Update an existing Slack message with new block content.

        Parameters:
            channelName (str): The channel ID or name containing the message
            threadId (str): The timestamp of the message to update
            messageBlocks (list): List of Slack block kit elements for the updated message

        Returns:
            SlackResponse: Response from the Slack API

        Raises:
            Exception: If message update fails
        """
        logger.debug(f"Updating {len(messageBlocks)} block message in {channelName} on thread {threadId} in Slack")
        response = cls.client.chat_update(
            channel=channelName,
            ts=threadId,
            blocks=messageBlocks,
            mrkdwn=True,
            parse="none",
            unfurl_links=False,
            unfurl_media=False,
            link_names=True,
            as_user=True,
        )

        if not response["ok"]:
            logger.warning(response.data)
            logger.warning("Failed to call Slack API")
        return response

    def buildHeader(cls, title: str) -> dict:
        """
        Build a header block for a Slack message.

        Parameters:
            title (str): The header text

        Returns:
            dict: Slack block kit header element
        """
        return {
            "type": "header",
            "text": {"type": "plain_text", "text": title, "emoji": True},
        }

    def buildSection(cls, text: str) -> dict:
        """
        Build a section block for a Slack message with markdown text.

        Parameters:
            text (str): The section text (supports markdown formatting)

        Returns:
            dict: Slack block kit section element
        """
        return {"type": "section", "text": {"type": "mrkdwn", "text": text}}

    def buildContext(cls, texts: list) -> dict:
        """
        Build a context block for a Slack message with multiple text elements.

        Parameters:
            texts (list): List of text strings to include in the context

        Returns:
            dict: Slack block kit context element
        """
        elements = []
        for text in texts:
            elements.append({"type": "mrkdwn", "text": text})

        return {"type": "context", "elements": elements}

    def buildDivider(cls) -> dict:
        """
        Build a divider block for a Slack message.

        Returns:
            dict: Slack block kit divider element
        """

    def createThreadConfigMap(cls, namespace: str, instanceId: str, pipelineRunName: str) -> bool:
        """
        Create a ConfigMap to store Slack thread information for a pipeline run.

        Parameters:
            namespace (str): Kubernetes namespace for the ConfigMap
            channelId (str): Slack channel ID where the thread was created
            threadId (str): Slack thread timestamp
            instanceId (str): Name of the Mas Instance ID (can be None or empty for update pipeline)

        Returns:
            bool: True if ConfigMap was created successfully, False otherwise
        """
        try:
            # Load Kubernetes configuration
            try:
                config.load_incluster_config()
            except Exception:
                config.load_kube_config()
            v1 = client.CoreV1Api()
            # For update pipeline (no instance ID), use "update" as identifier
            instance_identifier = instanceId if instanceId else "update"
            configmap_name = f"slack-thread-{instance_identifier}-{pipelineRunName}"
            configmap = client.V1ConfigMap(
                metadata=client.V1ObjectMeta(name=configmap_name, namespace=namespace),
                data={
                    "pipelineName": pipelineRunName,
                    "instanceId": instanceId,
                    "startTime": datetime.now(timezone.utc),
                },
            )
            v1.create_namespaced_config_map(namespace=namespace, body=configmap)
            logger.info(f"Created ConfigMap {configmap_name} in namespace {namespace}")
            return True
        except Exception as e:
            logger.error(f"Failed to create ConfigMap: {e}")
            return False

    def getThreadConfigMap(cls, namespace: str, instanceId: str, pipelineRunName: str) -> dict | None:
        """
        Retrieve Slack thread information from a ConfigMap.

        Parameters:
            namespace (str): Kubernetes namespace containing the ConfigMap
            instanceId (str): Unique identifier for the pipeline run (can be None or empty for update pipeline)

        Returns:
            dict | None: Dictionary containing threadId, channelId, pipelineName, and startTime, or None if not found
        """
        try:
            # Load Kubernetes configuration
            try:
                config.load_incluster_config()
            except Exception:
                config.load_kube_config()
            v1 = client.CoreV1Api()
            # For update pipeline (no instance ID), use "update" as identifier
            instance_identifier = instanceId if instanceId else "update"
            configmap_name = f"slack-thread-{instance_identifier}-{pipelineRunName}"
            configmap = v1.read_namespaced_config_map(name=configmap_name, namespace=namespace)
            logger.debug(f"Retrieved ConfigMap {configmap_name} from namespace {namespace}")
            return configmap.data
        except client.exceptions.ApiException as e:
            if e.status == 404:
                logger.debug(f"ConfigMap slack-thread-{instanceId}-{pipelineRunName} not found in namespace {namespace}")
            else:
                logger.error(f"Failed to retrieve ConfigMap: {e}")
            return None
        except Exception as e:
            logger.error(f"Failed to retrieve ConfigMap: {e}")
            return None

    def updateThreadConfigMap(cls, namespace: str, instanceId: str, updates: dict, pipelineRunName: str) -> bool:
        """
        Update the ConfigMap with additional data (e.g., task message timestamps).

        Parameters:
            namespace (str): Kubernetes namespace containing the ConfigMap
            instanceId (str): Unique identifier for the pipeline run (can be None or empty for update pipeline)
            updates (dict): Dictionary of key-value pairs to add/update in the ConfigMap

        Returns:
            bool: True if ConfigMap was updated successfully, False otherwise
        """
        try:
            # Load Kubernetes configuration
            try:
                config.load_incluster_config()
            except Exception:
                config.load_kube_config()
            v1 = client.CoreV1Api()
            # For update pipeline (no instance ID), use "update" as identifier
            instance_identifier = instanceId if instanceId else "update"
            configmap_name = f"slack-thread-{instance_identifier}-{pipelineRunName}"

            # Get existing ConfigMap
            configmap = v1.read_namespaced_config_map(name=configmap_name, namespace=namespace)

            # Update data
            if configmap.data is None:
                configmap.data = {}
            configmap.data.update(updates)

            # Patch the ConfigMap
            v1.patch_namespaced_config_map(name=configmap_name, namespace=namespace, body=configmap)
            logger.debug(f"Updated ConfigMap {configmap_name} in namespace {namespace}")
            return True
        except Exception as e:
            logger.error(f"Failed to update ConfigMap: {e}")
            return False

    def deleteThreadConfigMap(cls, namespace: str, instanceId: str, pipelineRunName: str) -> bool:
        """
        Delete the ConfigMap containing Slack thread information.

        Parameters:
            namespace (str): Kubernetes namespace containing the ConfigMap
            instanceId (str): Unique identifier for the pipeline run (can be None or empty for update pipeline)
            pipelineRunName (str): Unique identifier for the pipeline run

        Returns:
            bool: True if ConfigMap was deleted successfully, False otherwise
        """
        try:
            # Load Kubernetes configuration
            try:
                config.load_incluster_config()
            except Exception:
                config.load_kube_config()

            v1 = client.CoreV1Api()
            # For update pipeline (no instance ID), use "update" as identifier
            instance_identifier = instanceId if instanceId else "update"
            configmap_name = f"slack-thread-{instance_identifier}-{pipelineRunName}"
            v1.delete_namespaced_config_map(name=configmap_name, namespace=namespace)
            logger.info(f"Deleted ConfigMap {configmap_name} from namespace {namespace}")
            return True
        except client.exceptions.ApiException as e:
            if e.status == 404:
                logger.warning(f"ConfigMap slack-thread-{instanceId}-{pipelineRunName} not found in namespace {namespace}")
            else:
                logger.error(f"Failed to delete ConfigMap: {e}")
            return False
        except Exception as e:
            logger.error(f"Failed to delete ConfigMap: {e}")
            return False
        return {"type": "divider"}
Attributes
client property

Get or create the Slack WebClient instance.

Lazily initializes the Slack client using the SLACK_TOKEN environment variable.

Returns:

Name Type Description
WebClient WebClient

The Slack WebClient instance

Raises:

Type Description
Exception

If SLACK_TOKEN environment variable is not set

Methods:
postMessageBlocks(channelList, messageBlocks, threadId=None)

Post a message with block formatting to one or more Slack channels.

Parameters:

Name Type Description Default
channelList str | list[str]

Single channel ID/name or list of channel IDs/names

required
messageBlocks list

List of Slack block kit elements defining the message structure

required
threadId str

Thread timestamp to post as a reply. Defaults to None.

None

Returns:

Type Description
SlackResponse | list[SlackResponse]

SlackResponse | list[SlackResponse]: Single response if one channel, list of responses if multiple channels

Raises:

Type Description
Exception

If message posting fails

Source code in src/mas/devops/slack.py
def postMessageBlocks(cls, channelList: str | list[str], messageBlocks: list, threadId: str = None) -> SlackResponse | list[SlackResponse]:
    """
    Post a message with block formatting to one or more Slack channels.

    Parameters:
        channelList (str | list[str]): Single channel ID/name or list of channel IDs/names
        messageBlocks (list): List of Slack block kit elements defining the message structure
        threadId (str, optional): Thread timestamp to post as a reply. Defaults to None.

    Returns:
        SlackResponse | list[SlackResponse]: Single response if one channel, list of responses if multiple channels

    Raises:
        Exception: If message posting fails
    """
    responses: list[SlackResponse] = []

    if isinstance(channelList, str):
        channelList = [channelList]
    for channel in channelList:
        try:
            if threadId is None:
                logger.debug(f"Posting {len(messageBlocks)} block message to {channel} in Slack")
                response = cls.client.chat_postMessage(
                    channel=channel,
                    blocks=messageBlocks,
                    text="Summary text unavailable",
                    mrkdwn=True,
                    parse="none",
                    unfurl_links=False,
                    unfurl_media=False,
                    link_names=True,
                    as_user=True,
                )
            else:
                logger.debug(f"Posting {len(messageBlocks)} block message to {channel} on thread {threadId} in Slack")
                response = cls.client.chat_postMessage(
                    channel=channel,
                    thread_ts=threadId,
                    blocks=messageBlocks,
                    text="Summary text unavailable",
                    mrkdwn=True,
                    parse="none",
                    unfurl_links=False,
                    unfurl_media=False,
                    link_names=True,
                    as_user=True,
                )

            if not response["ok"]:
                logger.warning(response.data)
                logger.warning("Failed to call Slack API")
            responses.append(response)
        except Exception as e:
            logger.error(f"Fail to send a message to {channel}: {e}")
            raise

    return responses if len(responses) > 1 else responses[0]
postMessageText(channelList, message, attachments=None, threadId=None)

Post a plain text message to one or more Slack channels.

Parameters:

Name Type Description Default
channelList str | list[str]

Single channel ID/name or list of channel IDs/names

required
message str

The text message to post

required
attachments list

List of message attachments. Defaults to None.

None
threadId str

Thread timestamp to post as a reply. Defaults to None.

None

Returns:

Type Description
SlackResponse | list[SlackResponse]

SlackResponse | list[SlackResponse]: Single response if one channel, list of responses if multiple channels

Raises:

Type Description
Exception

If message posting fails

Source code in src/mas/devops/slack.py
def postMessageText(
    cls,
    channelList: str | list[str],
    message: str,
    attachments=None,
    threadId: str = None,
) -> SlackResponse | list[SlackResponse]:
    """
    Post a plain text message to one or more Slack channels.

    Parameters:
        channelList (str | list[str]): Single channel ID/name or list of channel IDs/names
        message (str): The text message to post
        attachments (list, optional): List of message attachments. Defaults to None.
        threadId (str, optional): Thread timestamp to post as a reply. Defaults to None.

    Returns:
        SlackResponse | list[SlackResponse]: Single response if one channel, list of responses if multiple channels

    Raises:
        Exception: If message posting fails
    """
    responses: list[SlackResponse] = []

    if isinstance(channelList, str):
        channelList = [channelList]

    for channel in channelList:
        if threadId is None:
            logger.debug(f"Posting message to {channel} in Slack")
            response = cls.client.chat_postMessage(
                channel=channel,
                text=message,
                attachments=attachments,
                mrkdwn=True,
                parse="none",
                unfurl_links=False,
                unfurl_media=False,
                link_names=True,
                as_user=True,
            )
        else:
            logger.debug(f"Posting message to {channel} on thread {threadId} in Slack")
            response = cls.client.chat_postMessage(
                channel=channel,
                thread_ts=threadId,
                text=message,
                attachments=attachments,
                mrkdwn=True,
                parse="none",
                unfurl_links=False,
                unfurl_media=False,
                link_names=True,
                as_user=True,
            )

        if not response["ok"]:
            logger.warning(response.data)
            logger.warning("Failed to call Slack API")
        responses.append(response)

    return responses if len(responses) > 1 else responses[0]

Create a permanent link to a Slack message.

Parameters:

Name Type Description Default
slackResponse SlackResponse

Slack response object containing channel and timestamp. Defaults to None.

None
channelId str

Channel ID if not using slackResponse. Defaults to None.

None
messageTimestamp str

Message timestamp if not using slackResponse. Defaults to None.

None
domain str

Slack workspace domain. Defaults to "ibm-mas".

'ibm-mas'

Returns:

Name Type Description
str str

Permanent URL to the Slack message

Raises:

Type Description
Exception

If neither slackResponse nor both channelId and messageTimestamp are provided

Source code in src/mas/devops/slack.py
def createMessagePermalink(
    cls,
    slackResponse: SlackResponse = None,
    channelId: str = None,
    messageTimestamp: str = None,
    domain: str = "ibm-mas",
) -> str:
    """
    Create a permanent link to a Slack message.

    Parameters:
        slackResponse (SlackResponse, optional): Slack response object containing channel and timestamp. Defaults to None.
        channelId (str, optional): Channel ID if not using slackResponse. Defaults to None.
        messageTimestamp (str, optional): Message timestamp if not using slackResponse. Defaults to None.
        domain (str, optional): Slack workspace domain. Defaults to "ibm-mas".

    Returns:
        str: Permanent URL to the Slack message

    Raises:
        Exception: If neither slackResponse nor both channelId and messageTimestamp are provided
    """
    if slackResponse is not None:
        channelId = slackResponse["channel"]
        messageTimestamp = slackResponse["ts"]
    elif channelId is None or messageTimestamp is None:
        raise Exception("Either channelId and messageTimestamp, or slackReponse params must be provided")

    return f"https://{domain}.slack.com/archives/{channelId}/p{messageTimestamp.replace('.', '')}"
updateMessageBlocks(channelName, threadId, messageBlocks)

Update an existing Slack message with new block content.

Parameters:

Name Type Description Default
channelName str

The channel ID or name containing the message

required
threadId str

The timestamp of the message to update

required
messageBlocks list

List of Slack block kit elements for the updated message

required

Returns:

Name Type Description
SlackResponse SlackResponse

Response from the Slack API

Raises:

Type Description
Exception

If message update fails

Source code in src/mas/devops/slack.py
def updateMessageBlocks(cls, channelName: str, threadId: str, messageBlocks: list) -> SlackResponse:
    """
    Update an existing Slack message with new block content.

    Parameters:
        channelName (str): The channel ID or name containing the message
        threadId (str): The timestamp of the message to update
        messageBlocks (list): List of Slack block kit elements for the updated message

    Returns:
        SlackResponse: Response from the Slack API

    Raises:
        Exception: If message update fails
    """
    logger.debug(f"Updating {len(messageBlocks)} block message in {channelName} on thread {threadId} in Slack")
    response = cls.client.chat_update(
        channel=channelName,
        ts=threadId,
        blocks=messageBlocks,
        mrkdwn=True,
        parse="none",
        unfurl_links=False,
        unfurl_media=False,
        link_names=True,
        as_user=True,
    )

    if not response["ok"]:
        logger.warning(response.data)
        logger.warning("Failed to call Slack API")
    return response
buildHeader(title)

Build a header block for a Slack message.

Parameters:

Name Type Description Default
title str

The header text

required

Returns:

Name Type Description
dict dict

Slack block kit header element

Source code in src/mas/devops/slack.py
def buildHeader(cls, title: str) -> dict:
    """
    Build a header block for a Slack message.

    Parameters:
        title (str): The header text

    Returns:
        dict: Slack block kit header element
    """
    return {
        "type": "header",
        "text": {"type": "plain_text", "text": title, "emoji": True},
    }
buildSection(text)

Build a section block for a Slack message with markdown text.

Parameters:

Name Type Description Default
text str

The section text (supports markdown formatting)

required

Returns:

Name Type Description
dict dict

Slack block kit section element

Source code in src/mas/devops/slack.py
def buildSection(cls, text: str) -> dict:
    """
    Build a section block for a Slack message with markdown text.

    Parameters:
        text (str): The section text (supports markdown formatting)

    Returns:
        dict: Slack block kit section element
    """
    return {"type": "section", "text": {"type": "mrkdwn", "text": text}}
buildContext(texts)

Build a context block for a Slack message with multiple text elements.

Parameters:

Name Type Description Default
texts list

List of text strings to include in the context

required

Returns:

Name Type Description
dict dict

Slack block kit context element

Source code in src/mas/devops/slack.py
def buildContext(cls, texts: list) -> dict:
    """
    Build a context block for a Slack message with multiple text elements.

    Parameters:
        texts (list): List of text strings to include in the context

    Returns:
        dict: Slack block kit context element
    """
    elements = []
    for text in texts:
        elements.append({"type": "mrkdwn", "text": text})

    return {"type": "context", "elements": elements}
buildDivider()

Build a divider block for a Slack message.

Returns:

Name Type Description
dict dict

Slack block kit divider element

Source code in src/mas/devops/slack.py
def buildDivider(cls) -> dict:
    """
    Build a divider block for a Slack message.

    Returns:
        dict: Slack block kit divider element
    """
createThreadConfigMap(namespace, instanceId, pipelineRunName)

Create a ConfigMap to store Slack thread information for a pipeline run.

Parameters:

Name Type Description Default
namespace str

Kubernetes namespace for the ConfigMap

required
channelId str

Slack channel ID where the thread was created

required
threadId str

Slack thread timestamp

required
instanceId str

Name of the Mas Instance ID (can be None or empty for update pipeline)

required

Returns:

Name Type Description
bool bool

True if ConfigMap was created successfully, False otherwise

Source code in src/mas/devops/slack.py
def createThreadConfigMap(cls, namespace: str, instanceId: str, pipelineRunName: str) -> bool:
    """
    Create a ConfigMap to store Slack thread information for a pipeline run.

    Parameters:
        namespace (str): Kubernetes namespace for the ConfigMap
        channelId (str): Slack channel ID where the thread was created
        threadId (str): Slack thread timestamp
        instanceId (str): Name of the Mas Instance ID (can be None or empty for update pipeline)

    Returns:
        bool: True if ConfigMap was created successfully, False otherwise
    """
    try:
        # Load Kubernetes configuration
        try:
            config.load_incluster_config()
        except Exception:
            config.load_kube_config()
        v1 = client.CoreV1Api()
        # For update pipeline (no instance ID), use "update" as identifier
        instance_identifier = instanceId if instanceId else "update"
        configmap_name = f"slack-thread-{instance_identifier}-{pipelineRunName}"
        configmap = client.V1ConfigMap(
            metadata=client.V1ObjectMeta(name=configmap_name, namespace=namespace),
            data={
                "pipelineName": pipelineRunName,
                "instanceId": instanceId,
                "startTime": datetime.now(timezone.utc),
            },
        )
        v1.create_namespaced_config_map(namespace=namespace, body=configmap)
        logger.info(f"Created ConfigMap {configmap_name} in namespace {namespace}")
        return True
    except Exception as e:
        logger.error(f"Failed to create ConfigMap: {e}")
        return False
getThreadConfigMap(namespace, instanceId, pipelineRunName)

Retrieve Slack thread information from a ConfigMap.

Parameters:

Name Type Description Default
namespace str

Kubernetes namespace containing the ConfigMap

required
instanceId str

Unique identifier for the pipeline run (can be None or empty for update pipeline)

required

Returns:

Type Description
dict | None

dict | None: Dictionary containing threadId, channelId, pipelineName, and startTime, or None if not found

Source code in src/mas/devops/slack.py
def getThreadConfigMap(cls, namespace: str, instanceId: str, pipelineRunName: str) -> dict | None:
    """
    Retrieve Slack thread information from a ConfigMap.

    Parameters:
        namespace (str): Kubernetes namespace containing the ConfigMap
        instanceId (str): Unique identifier for the pipeline run (can be None or empty for update pipeline)

    Returns:
        dict | None: Dictionary containing threadId, channelId, pipelineName, and startTime, or None if not found
    """
    try:
        # Load Kubernetes configuration
        try:
            config.load_incluster_config()
        except Exception:
            config.load_kube_config()
        v1 = client.CoreV1Api()
        # For update pipeline (no instance ID), use "update" as identifier
        instance_identifier = instanceId if instanceId else "update"
        configmap_name = f"slack-thread-{instance_identifier}-{pipelineRunName}"
        configmap = v1.read_namespaced_config_map(name=configmap_name, namespace=namespace)
        logger.debug(f"Retrieved ConfigMap {configmap_name} from namespace {namespace}")
        return configmap.data
    except client.exceptions.ApiException as e:
        if e.status == 404:
            logger.debug(f"ConfigMap slack-thread-{instanceId}-{pipelineRunName} not found in namespace {namespace}")
        else:
            logger.error(f"Failed to retrieve ConfigMap: {e}")
        return None
    except Exception as e:
        logger.error(f"Failed to retrieve ConfigMap: {e}")
        return None
updateThreadConfigMap(namespace, instanceId, updates, pipelineRunName)

Update the ConfigMap with additional data (e.g., task message timestamps).

Parameters:

Name Type Description Default
namespace str

Kubernetes namespace containing the ConfigMap

required
instanceId str

Unique identifier for the pipeline run (can be None or empty for update pipeline)

required
updates dict

Dictionary of key-value pairs to add/update in the ConfigMap

required

Returns:

Name Type Description
bool bool

True if ConfigMap was updated successfully, False otherwise

Source code in src/mas/devops/slack.py
def updateThreadConfigMap(cls, namespace: str, instanceId: str, updates: dict, pipelineRunName: str) -> bool:
    """
    Update the ConfigMap with additional data (e.g., task message timestamps).

    Parameters:
        namespace (str): Kubernetes namespace containing the ConfigMap
        instanceId (str): Unique identifier for the pipeline run (can be None or empty for update pipeline)
        updates (dict): Dictionary of key-value pairs to add/update in the ConfigMap

    Returns:
        bool: True if ConfigMap was updated successfully, False otherwise
    """
    try:
        # Load Kubernetes configuration
        try:
            config.load_incluster_config()
        except Exception:
            config.load_kube_config()
        v1 = client.CoreV1Api()
        # For update pipeline (no instance ID), use "update" as identifier
        instance_identifier = instanceId if instanceId else "update"
        configmap_name = f"slack-thread-{instance_identifier}-{pipelineRunName}"

        # Get existing ConfigMap
        configmap = v1.read_namespaced_config_map(name=configmap_name, namespace=namespace)

        # Update data
        if configmap.data is None:
            configmap.data = {}
        configmap.data.update(updates)

        # Patch the ConfigMap
        v1.patch_namespaced_config_map(name=configmap_name, namespace=namespace, body=configmap)
        logger.debug(f"Updated ConfigMap {configmap_name} in namespace {namespace}")
        return True
    except Exception as e:
        logger.error(f"Failed to update ConfigMap: {e}")
        return False
deleteThreadConfigMap(namespace, instanceId, pipelineRunName)

Delete the ConfigMap containing Slack thread information.

Parameters:

Name Type Description Default
namespace str

Kubernetes namespace containing the ConfigMap

required
instanceId str

Unique identifier for the pipeline run (can be None or empty for update pipeline)

required
pipelineRunName str

Unique identifier for the pipeline run

required

Returns:

Name Type Description
bool bool

True if ConfigMap was deleted successfully, False otherwise

Source code in src/mas/devops/slack.py
def deleteThreadConfigMap(cls, namespace: str, instanceId: str, pipelineRunName: str) -> bool:
    """
    Delete the ConfigMap containing Slack thread information.

    Parameters:
        namespace (str): Kubernetes namespace containing the ConfigMap
        instanceId (str): Unique identifier for the pipeline run (can be None or empty for update pipeline)
        pipelineRunName (str): Unique identifier for the pipeline run

    Returns:
        bool: True if ConfigMap was deleted successfully, False otherwise
    """
    try:
        # Load Kubernetes configuration
        try:
            config.load_incluster_config()
        except Exception:
            config.load_kube_config()

        v1 = client.CoreV1Api()
        # For update pipeline (no instance ID), use "update" as identifier
        instance_identifier = instanceId if instanceId else "update"
        configmap_name = f"slack-thread-{instance_identifier}-{pipelineRunName}"
        v1.delete_namespaced_config_map(name=configmap_name, namespace=namespace)
        logger.info(f"Deleted ConfigMap {configmap_name} from namespace {namespace}")
        return True
    except client.exceptions.ApiException as e:
        if e.status == 404:
            logger.warning(f"ConfigMap slack-thread-{instanceId}-{pipelineRunName} not found in namespace {namespace}")
        else:
            logger.error(f"Failed to delete ConfigMap: {e}")
        return False
    except Exception as e:
        logger.error(f"Failed to delete ConfigMap: {e}")
        return False
    return {"type": "divider"}