Source code for stanford.mais.workgroup.workgroup

# vim: ts=4 sw=4 et
# -*- coding: utf-8 -*-

# © 2021 The Board of Trustees of the Leland Stanford Junior University.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

# This file has a ton of references to classes that are defined in the same
# file.  Pythons older than 3.14 (which implements PEP 649) cannot handle that
# natively without this import.
# NOTE: At some point in the future, this annodation will be deprecated.
from __future__ import annotations

# Start with stdlib imports
from collections.abc import Mapping
import dataclasses
import datetime
import logging
import pathlib
import re
import requests
from typing import Any, Literal, TYPE_CHECKING
import weakref
import zoneinfo

# Finally, do local imports
from stanford.mais.workgroup.properties import *
from stanford.mais.workgroup.member import *

# There are some needed type annotations where, if we imported them now, we
# would make an import loop.  So, only import them when type-checking.
if TYPE_CHECKING:
    from stanford.mais.workgroup import WorkgroupClient

# Set up logging

logger = logging.getLogger(__name__)
debug = logger.debug
info = logger.info
warning = logger.warning
error = logger.error

__all__ = (
    'Workgroup',
    'WorkgroupDeleted',
    'WorkgroupFilter',
    'WorkgroupVisibility',
    'PrivgroupContents',
    'PrivgroupEntry',
)


# First, some Privgroup-related stuff


[docs] @dataclasses.dataclass(frozen=True) class PrivgroupEntry: sunetid: str """The person's SUNetID. """ name: str """The person's name, in 'Last, First' format. """ last_update: datetime.date """The date when the person became part of the privgroup. """
[docs] @classmethod def from_json( cls, json: dict[str, Any], ) -> PrivgroupEntry: """Make a privgroup entry from a Workgroups API JSON object. """ # Make sure our keys are in the dict if 'name' not in json: raise KeyError('name') if 'id' not in json: raise KeyError('id') if 'lastUpdate' not in json: raise KeyError('lastUpdate') # Make the class return cls( sunetid=json['id'], name=json['name'].rstrip(), last_update=Workgroup.datestr_to_date(json['lastUpdate']), )
[docs] @dataclasses.dataclass(frozen=True) class PrivgroupContents: members: set[PrivgroupEntry] administrators: set[PrivgroupEntry]
# Next, Workgroup-related exceptions:
[docs] class WorkgroupDeleted(KeyError): """Workgroup used to exist, but has been deleted. This exception is raised whenever a workgroup is "inactive", which means that it used to exist, but has been deleted. Constrast that with a workgroup never having existed, which triggers the raise of a KeyError. This exception exists for folks who care if a workgroup used to exist: Folks who don't care can simply catch KeyError, which will work because this is a subclass of KeyError. """ pass
# Now, on to the Workgroup!
[docs] class Workgroup: """A Stanford Workgroup. """ # Methods are grouped by their position in the CRUD acronym: # C: Create # R: Read/Report # U: Update # D: Delete # # "C" methods #
[docs] @classmethod def create( cls, client: WorkgroupClient, name: str, description: str, filter: WorkgroupFilter = WorkgroupFilter.NONE, privgroup: bool = True, reusable: bool = True, visibility: WorkgroupVisibility = WorkgroupVisibility.STANFORD, ) -> Workgroup: """Create a new workgroup. Your client certificate must be a stem owner in order to create workgroups in a given stem. Once created, a workgroup will have no members, and two administrators: * *The stem-owner group*. For example, workgroup ``abc:def`` will have ``workgroup:abc-owners`` nested as an administrator. **This cannot be changed**. * *Your client certificate*. Even though your client certificate is already a stem owner—and therefore an administrator—it will be explicitly added as an administrator. You *are allowed* to remove your client certificate from the list of workgroup administrators. To do so, use code like this: .. code-block:: python wclient = WorkgroupClient(...) workgroup = Workgroup.create(client=wclient, ...) workgroup.administrators.certificates.remove(CLIENT_CERT_CN) :param client: A :class:`WorkgroupClient`. :param name: The fully-qualified workgroup name. Required. For example, to create a workgroup named `abc` in stem `school`, use name `school:abc`. The name portion is limited to 81 characters, which may contain only lowercase letters, numbers, hyphens, and underscores. The name portion must start with a letter or number. :param description: The workgroup description. It must contain at least one character. Limited to 255 characters from the ISO 8859-1 ("Latin 1") character set. :param filter: Optional. See :meth:`filter`. Defaults to `NONE`. :param privgroup: Optional. See :meth:`privgroup`. Defaults to `True`. :param reusable: Optional. See :meth:`reusable`. Defaults to `True`. :param visibility: Optional. See :meth:`visibility`. Defaults to `STANFORD`. :returns: An instance of the newly-created workgroup. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raise IndexError: The proposed workgroup name or description is too short or too long. :raise ValueError: The proposed workgroup name or description has invalid characters. :raise KeyError: A workgroup with this name already exists. :raises WorkgroupDeleted: The proposed workgroup name was already used for a workgroup, and that workgroup has since been deleted. .. note:: Deleted workgroups are not really deleted, just hidden. Stem owners can restore 'deleted' workgroups through the Workgroup Manager web site. :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises NotImplementedError: Received an unexpected HTTP response code. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. """ debug(f"In create for name '{name}'") # Check the length of the workgroup name. if len(name) > 81: error(f"Proposed name '{name}' is too long") raise IndexError('name') # Check if the name is invalid. if re.fullmatch( r'[a-z0-9][a-z0-9_-]*:[a-z0-9][a-z0-9_-]*', name, ) is None: error(f"Proposed name '{name}' is not valid") raise ValueError('name') # Check the length of the description. if len(description) == 0: error('Proposed description is too short') raise IndexError('description') if len(description) > 255: error(f"Proposed description is too long") raise IndexError('description') # Check if the description has nonprintable characters or is non-Latin1 try: description.encode('latin1') except UnicodeError: error('Proposed description is not encodable in ISO 8859-1') raise ValueError('description') if not description.isprintable(): error(f"Proposed description has non-printable characters") raise ValueError('description') # Get the Requests session session = client.client.session # Make the Workgroup. post_url = client._url( fragment=name, ) debug(f"Filter is '{filter}' — Visibility is '{visibility}'") debug(f"Running POST to {post_url} to create workgroup") response = client.client.session.post( post_url, json={ 'description': description, 'filter': str(filter), 'privgroup': ('TRUE' if privgroup is True else 'FALSE'), 'reusable': ('TRUE' if reusable is True else 'FALSE'), 'visibility': str(visibility), }, ) # Catch an inactive workgroup. match response.status_code: case 400: response_json = None try: response_json = response.json() except requests.exceptions.JSONDecodeError: pass # Did our workgroup go inactive out from under us? if ( response_json is not None and response_json['notification'] == 'Workgroup is inactive' ): warning(f"Workgroup {name} used to exist but has been deleted") raise WorkgroupDeleted(name) else: # We have a generic 400 error raise ChildProcessError(response.text) case 500: error(f"Upstream API error: {response.text}") raise ChildProcessError(response.text) case 401 | 403: warning(f"Permission error on create workgroup {name}") raise PermissionError(response.text) case 409: # Catch an already-existing workgroup warning(f"Workgroup {name} already exists") raise KeyError(name) case _ if response.status_code != 201: raise NotImplementedError(response.text) # Decode the JSON, and send to make the instance debug(f"Got back a response!") response_json = response.json() result = cls( client=client, from_json=response_json, ) # Put our new workgroup into the cache, and return client._cache[name] = weakref.ref(result) return result
[docs] @classmethod def get( cls, client: WorkgroupClient, name: str, ) -> Workgroup: """Fetch a Workgroup. Fetch an existing Workgroup using the MaIS Workgroup API, and return the corresponding :class:`Workgroup` instance. If the instance already exists in the cache, the cached instance will be returned. .. note:: Use the :attr:`last_refresh` property to see if your instance is too old, and the :meth:`refresh` method to refresh it. .. warning:: If the workgroup's visibility is set to `PRIVATE`, and your client certificate is not an administrator of the workgroup (either directly, or via stem ownership), then… * The :attr:`can_see_membership` property will be ``False``; * The sets of members and administrators will be empty; and * All attempts to access the privgroup list or make changes will return a :class:`PermissionError`. :param client: A :class:`WorkgroupClient` representing our API endpoint. :param name: The name of the workgroup to fetch. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned), or you did not provide a workgroup name. :raises KeyError: The workgroup does not exist, and has never existed. :raises WorkgroupDeleted: The workgroup used to exist, but was deleted. :raises PermissionError: You did not use a valid certificate. :raises NotImplementedError: Received an unexpected HTTP response code. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. """ # Lowercase the name before we continue name = name.lower() debug(f"In get for Workgroup name {name}") # Check if the Workgroup is in the cache if name in client._cache: result = client._cache[name]() if result is not None: debug(f"Returning Workgroup {name} from cache") return result else: debug(f"{name} expired in cache. Re-fetching…") # Make the request for the Workgroup. get_url = client._url( fragment=name, ) debug(f"Doing GET of {get_url}") response = client.client.session.get( get_url, ) # Catch an inactive workgroup. match response.status_code: case 400: response_json = None try: response_json = response.json() except requests.exceptions.JSONDecodeError: pass # Did our workgroup go inactive out from under us? if ( response_json is not None and response_json['notification'] == 'Workgroup is inactive' ): warning(f"Workgroup {name} has been deleted") raise WorkgroupDeleted(name) else: # We have a generic 400 error raise ChildProcessError(response.text) case 500: error(f"Upstream API error: {response.text}") raise ChildProcessError(response.text) case 401 | 403: warning(f"Permission error on get {name}") raise PermissionError(response.text) case 404: warning(f"Workgroup {name} not found") raise KeyError(name) case _ if response.status_code != 200: raise NotImplementedError(response.text) # Decode the JSON, and send to make the instance debug('Got a response!') response_json = response.json() result = cls( client=client, from_json=response_json, ) # Put into the cache and return! client._cache[name] = weakref.ref(result) return result
[docs] def refresh(self) -> None: """Refresh an existing Workgroup instance. Make a query to the Workgroups API to update this instance. This will refresh all properties, including membership. The only thing guaranteed *not* to change is the workgroup's name. .. note:: It is possible that your client certificate gained administrator access between this instance's creation, and now. It is also possible that your client certificate *lost* administrator access. .. danger:: It is also possible that someone else has deleted the workgroup. If that happens, the :attr:`deleted` property will be set and a :class:`WorkgroupDeleted` exception will be raised. .. warning:: If your client certificate is not an administrator of the workgroup (either directly, or via stem ownership), then… * The :attr:`can_see_membership` property will be ``False``; * The sets of members and administrators will be empty; and * All attempts to access the privgroup list or make changes will return a :class:`PermissionError`. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises WorkgroupDeleted: The workgroup has been deleted. :raises NotImplementedError: Received an unexpected HTTP response code. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. """ debug(f"In refresh for Workgroup name {self.name}") if self.deleted: raise EOFError('Workgroup has been deleted') # Make the request for the Workgroup. get_url = self.client._url( fragment=self.name, ) debug(f"Doing GET of {get_url}") response = self.client.client.session.get( get_url, ) # Hand off the change for processing return self._handle_refresh(response)
def _mark_deleted(self) -> None: """Mark an instance as deleted When a workgroup is deleted, this updates various fields in the Workgroup instance. This could happen because we intentionally deleted the workgroup, or the workgroup could have been deleted by someone else, after the instance was created. """ # We will leave all existing properties alone, but we will force # the members and administrators sets to become empty. self._members.update_from_upstream(list()) self._administrators.update_from_upstream(list()) # Update last-refresh & mark the instance as deleted self._last_refresh = datetime.datetime.now(tz=datetime.timezone.utc) self._deleted = True # All done! def _handle_refresh( self, response: requests.Response, ) -> None: """Process the response from a refresh. This method takes the Requests Response that is generated by doing a refresh, and updates this instance appropriately. Potentially, *any* part of this instance can change, except for the workgroup's name. That workgroup may even have been deleted! For this reason, this method takes the entire Response, not just the JSON part. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises WorkgroupDeleted: The workgroup has been deleted. :raises NotImplementedError: Received an unexpected HTTP response code. """ # Did our change go through? if response.status_code == 200: # It worked! Send it through for normal processing. debug('Got a response!') response_json = response.json() self._from_json(response_json) # Catch various errors match response.status_code: case 400: response_json = None try: response_json = response.json() except requests.exceptions.JSONDecodeError: pass # Did our workgroup go inactive out from under us? if ( response_json is not None and response_json['notification'] == 'Workgroup is inactive' ): error(f"Already-instanced workgroup {self.name} has been deleted") self._mark_deleted() raise WorkgroupDeleted(self.name) else: # We have a generic 400 error raise ChildProcessError(response.text) case 500: error(f"Upstream API error: {response.text}") raise ChildProcessError(response.text) case 401 | 403: warning(f"Permission error on get {self.name}") raise PermissionError(response.text) case 404: warning(f"Workgroup {self.name} not found") raise KeyError(self.name) case _ if response.status_code != 200: raise NotImplementedError(response.text) # All done! return None # Actually set up a new instance. def __init__( self, **kwargs, ) -> None: debug('In __init__') # Error out if we didn't get our expected data. if 'from_json' not in kwargs or 'client' not in kwargs: raise NotImplementedError('Do not instantiate Workgroups directly.') client: 'WorkgroupClient' = kwargs['client'] response_json: Mapping[str, Any] = kwargs['from_json'] # We know that this workgroup exists right now. self._deleted = False # Set a couple of properties that will never change. self._client = client self._name = response_json['name'] # Make empty containers for members and administrators self._members: WorkgroupMembership = WorkgroupMembership( workgroup=self, collection_type='members', ) self._administrators: WorkgroupMembership = WorkgroupMembership( workgroup=self, collection_type='administrators', ) # Process the remaining properties self._from_json(response_json) def _from_json( self, response_json: Mapping[str, Any], ) -> None: """Process the JSON from a Workgroups API "Get Workgroup" call. This method updates our instance with all of the data received from the Workgroups API. It is meant to be called on instance creation, and on instance refresh. .. note:: This method does not handle setting the workgroup name. That never changes throughout a workgroup's life. :param response_json: The dict containing the decoded Workgroup JSON. """ debug(f"In _from_json for workgroup {self.name}") # The description is the only field that might not be present. if 'description' in response_json: self._description = response_json['description'] else: self._description = '' debug(f"Name is {self._name}, description is {len(self._description)} characters") # Set our boolean properties if response_json['privgroup'] == 'TRUE': debug('Privgroup is on') self._privgroup = True else: debug('Privgroup is OFF') self._privgroup = False if response_json['reusable'] == 'TRUE': debug('Reusable is on') self._reusable = True else: debug('Reusable is OFF') self._reusable = False # Set our enum properties self._visibility = WorkgroupVisibility.from_str( response_json['visibility'] ) self._filter = WorkgroupFilter.from_str( response_json['filter'] ) debug(f"Visibility is {self._visibility}s — Privgroup is {self._privgroup}s") # Set the last-updated date self._last_update = self.datestr_to_date(response_json['lastUpdate']) # Update the members and administrators containers. debug('Building memberships') self._members.update_from_upstream( response_json=response_json['members'], ) self._administrators.update_from_upstream( response_json=response_json['administrators'], ) # Can we see the workgroups members and administrators? # The `can_see_membership` property explains the rules, but there's a # sneaky workground we can use: # Every workgroup has the stem-owner group as administrators. So, # check if we can see *any* administrators. self._can_see_membership = (True if len(self._administrators) > 0 else False) # Update our last-refreshed time, and we're done! self._last_refresh = datetime.datetime.now(tz=datetime.timezone.utc) debug('Instance construction/update complete!') return None # # "R" methods. # # Most of the instance is accessed via properties. # We start with those that don't deal with collections of things. @property def client(self) -> WorkgroupClient: """Return the :class:`WorkgroupClient` that was used to fetch this :class:`Workgroup`. """ return self._client @property def deleted(self) -> bool: """Is the workgroup deleted? If :meth:`delete` is called on the workgroup, this property is set to ``True``, and all methods & properties will raise an :class:`EOFError`. The exception are this method, and :attr:`client`. .. note:: This property may also become ``True`` if you called :meth:`refresh` on the instance, and the refresh reveals that the workgroup has been deleted. """ return self._deleted @property def name(self) -> str: """The fully-qualified name of the workgroup. This may not be changed after the workgroup is created. .. note:: Since workgroups are never actually "deleted", just hidden, the name remains accessible even after deletion. """ return self._name @property def last_refresh(self) -> datetime.datetime: """When the instance was last refreshed. This is set to the (aware) datetime when the instance was last refreshed from the Workgroup API. The following actions can trigger a refresh: * Calling :meth:`create` or :meth:`get`. * Calling :meth:`refresh`. * Changing any property of the workgroup. .. note:: If :meth:`delete` is called to delete the workgroup, this will return the datetime when the workgroup was deleted. """ return self._last_refresh @property def description(self) -> str: """The workgroup description. This is a property: Use as a value to get the current description; call with a string to change the current setting. .. warning:: Changing a workgroup property triggers a refresh of the entire workgroup. See the documentation for :meth:`refresh`; all of those warnings apply here. :param str value: The new description. The maximum length is 255 characters. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. :raise IndexError: The new description is too long. :raises EOFError: The workgroup has been deleted. """ if self.deleted: raise EOFError('Workgroup has been deleted') return self._description @property def last_update(self) -> datetime.date: """The date when the workgroup was last updated. When the workgroup's properties, members, or administrators are updated, this datestamp resets. .. warning:: This date is in the Stanford-local time zone. Remember to take this into account when doing comparisons. .. note:: Why is it a date, instead of a datetime? Because that is what the Workgroup API provides. .. danger:: Privgroup member and administrator changes do not reset the last_update datestamp. This is a "last updated" for the workgroup, not the privgroup! :raises EOFError: The workgroup has been deleted. """ if self.deleted: raise EOFError('Workgroup has been deleted') return self._last_update @property def filter(self) -> WorkgroupFilter: """The workgroup filter. This controls the effective membership of the workgroup. See the definition of :class:`~stanford.mais.workgroup.properties.WorkgroupFilter` for more information on how Workgroup filters take the real membership (which you see through this API) and filter it to provude an effective membership (which others see). This is a property: Use as a value to get the current description; call with a :class:`~stanford.mais.workgroup.properties.WorkgroupFilter` (or equivalent string) to change the current setting. .. warning:: Changing a workgroup property triggers a refresh of the entire workgroup. See the documentation for :meth:`refresh`; all of those warnings apply here. :param value: The new filter. This may either be a :class:`~stanford.mais.workgroup.properties.WorkgroupFilter`, or a string that parses cleanly into a :class:`~stanford.mais.workgroup.properties.WorkgroupFilter`. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. :raises ValueError: The value provided does not match an enum value. This may only be thrown when providing a :class:`str` for `value`. :raises EOFError: The workgroup has been deleted. """ if self.deleted: raise EOFError('Workgroup has been deleted') return self._filter @property def privgroup(self) -> bool: """Does the workgroup have an associated privgroup? A 'privgroup' is a flattened list of workgroup members and administrators, with certificates removed, nested workgroups flattened, and filters applied. As described in :meth:`filter`, workgroups have a *real* list of members (which is accessed & maintained by this API) and an *effective* list of members (after the workgroup membership is flattened and filter applied). If `privgroup` is ``True``, then the flattened and filtered list of workgroup members and administrators will be made available to downstream systems like LDAP. If ``False``, then the workgroup may only be used within Workgroup Manager (for example, it can be nested in other workgroups). .. warning:: If you disable privgroup on a workgroup and then nest it into another workgroup, this workgroup's members will **not** appear in the privgroup of the nested workgroup. .. note:: This property cannot be changed through the Workgroup Manager web site, only through the API. This is a property: Use as a value to get the current description; call with a :class:`bool` to change the current setting. .. warning:: Changing a workgroup property triggers a refresh of the entire workgroup. See the documentation for :meth:`refresh`; all of those warnings apply here. :param bool value: The new privgroup setting. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. :raises TypeError: `value` was not a :class:`bool`. :raises EOFError: The workgroup has been deleted. """ if self.deleted: raise EOFError('Workgroup has been deleted') return self._privgroup @property def reusable(self) -> bool: """May the workgroup be nested outside of its stem? If `reusable` is ``True``, then this workgroup may only be nested within other workgroups of the same stem. Otherwise, this workgroup may be nested in any workgroup. "nesting" means including the *effective* membership of one workgroup in another. See :meth:`filter` for information on real vs. effective workgroup membership. Use as a property to get the current reusable setting; call with parameters to change the current reusable setting. .. warning:: Changing this setting will **not** affect existing nesting relationships. You can view existing nesting relationships in Workgroup Manager. This is a property: Use as a value to get the current description; call with a :class:`bool` to change the current setting. .. warning:: Changing a workgroup property triggers a refresh of the entire workgroup. See the documentation for :meth:`refresh`; all of those warnings apply here. :param bool value: The new reusable setting. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. :raises TypeError: `value` was not a :class:`bool`. :raises EOFError: The workgroup has been deleted. """ if self.deleted: raise EOFError('Workgroup has been deleted') return self._reusable @property def visibility(self) -> WorkgroupVisibility: """Is the workgroup's membership visible to others? See :class:`~stanford.mais.workgroup.properties.WorkgroupVisibility` for an explanation of the different workgroup visibility options. Use as a property to get the current visibility setting; call with parameters to change the current filter setting. .. danger:: Setting this to ``PRIVATE`` can cause unexpected and unusual issues in downstream applications. This is a property: Use as a value to get the current description; call with a :class:`~stanford.mais.workgroup.properties.WorkgroupVisibility` (or equivalent string) to change the current setting. .. warning:: Changing a workgroup property triggers a refresh of the entire workgroup. See the documentation for :meth:`refresh`; all of those warnings apply here. :param value: The new visibility. This may either be a :class:`~stanford.mais.workgroup.properties.WorkgroupVisibility`, or a string that parses cleanly into a :class:`~stanford.mais.workgroup.properties.WorkgroupVisibility`. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. :raises ValueError: The value provided does not match an enum value. :raises EOFError: The workgroup has been deleted. """ if self.deleted: raise EOFError('Workgroup has been deleted') return self._visibility @property def can_see_membership(self) -> bool: """Can we see the workgroup's members and administrators? See :attr:`visibility` for an explanation of workgroup visibility. If this workgroup's visibility is `STANFORD`, then we can see the members and administrators of the workgroup. If this workgroup's visibility is `PRIVATE`, then the client certificate must be a workgroup administrator. If yes, then we can see the members and administrators of the workgroup. If this workgroup's visibility is `PRIVATE`, and the client certificate is **not** a workgroup administrator, then we *cannot* see the members and administrators of a workgroup: The sets will appear empty. :raises EOFError: The workgroup has been deleted. """ if self.deleted: raise EOFError('Workgroup has been deleted') return self._can_see_membership # Next, we have the different containers. @property def members(self) -> WorkgroupMembership: """Access the sets of workgroup members. .. note:: This is the *real* set of members, not the *effective* membership. Refer to :class:`~stanford.mais.workgroup.properties.WorkgroupFilter` for more information. See :class:`~stanford.mais.workgroup.member.WorkgroupMembership` for information on how access the sets of member people (SUNetIDs), workgroups (fully-qualified names), and certificates (client certificate subject common names). :raises EOFError: The workgroup has been deleted. """ if self.deleted: raise EOFError('Workgroup has been deleted') return self._members @property def administrators(self) -> WorkgroupMembership: """Access the sets of workgroup administrators. .. note:: There is no difference between the *real* and *effective* set of workgroup administrators. Anyone or anything included here has administrative power over the workgroup, regardless of filter. The one exception is when a nested workgroup has a filter set, in which case the filter applies to that workgroup's membership only. Refer to :class:`~stanford.mais.workgroup.properties.WorkgroupFilter` for more information on filters. See :class:`~stanford.mais.workgroup.member.WorkgroupMembership` for information on how access the sets of administrator people (SUNetIDs), workgroups (fully-qualified names), and certificates (client certificate subject common names). :raises EOFError: The workgroup has been deleted. """ if self.deleted: raise EOFError('Workgroup has been deleted') return self._administrators def __repr__(self) -> str: if self.deleted: return "Workgroup(deleted)" # This will be the bits we assemble at the end pieces: list[str] = list() # Start with the name pieces.append(f"name=\"{self.name}\"") # For the description, we need to escape double-quotes escaped_description = self.description.replace('"', '\\"') pieces.append(f"description=\"{escaped_description}\"") # These properties need no additional handling pieces.append(f"visibility={self.visibility}") pieces.append(f"reusable={self.reusable}") pieces.append(f"privgroup={self.privgroup}") pieces.append(f"filter={self.filter}") pieces.append(f"members={self.members}") pieces.append(f"administrators={self.administrators}") pieces.append(f"last_update={self.last_update}") # Put it all together return 'Workgroup(' + ','.join(pieces) + ')'
[docs] def get_privgroup(self) -> PrivgroupContents: """Generate the current privilege group listing. A privgroup is a list of actual people (no workgroups or certificates). A workgroup has two privgroups: One for members and one for administrators. For the privgroup of members, start with an empty list, and do the following: 1. Add all of this workgroup's members that are people. 2. For each of this workgroup's members that are workgroups, *if the nested workgroup's privgroup property is enabled*, then calculate the nested workgroup's privgroup (members only), and add those people to this list. 3. Remove all duplicates 4. Remove all people who do not meet this workgroup's filter. For the privgroup of administrators, start with an empty list, and do the following: 1. Add all of this workgroup's administrators that are people. 2. For each of this workgroup's administrators that are workgroups, *if the nested workgroup's privgroup property is enabled*, then calculate the nested workgroup's privgroup (**members only**), and add those people to this list. 3. Remove all duplicates 4. Remove all people who do not meet this workgroup's filter. Since the privgroup's contents potentially depend on ths membership of other workgroups, and may also depend on the affiliation of each member, the workgroup's privgroup must be fetched when it is needed. .. note:: To get the privgroup for a workgroup, your client certificate must be able to see the membership of this workgroup, otherwise a `PermissionError` will be raised. See :meth:`can_see_membership`. .. warning:: It is possible that your client certificate gained administrator access between this instance's creation, and now. If you think that happened, call :meth:`refresh` before calling this method. It is also possible that your client certificate *lost* administrator access, or that a workgroup was made private. In that case, this method will return a PermissionError even though :meth:`can_see_membership` returns `True`. .. danger:: It is also possible that someone else has deleted the workgroup. If that happens, the :attr:`deleted` property will be set and a :class:`WorkgroupDeleted` exception will be raised. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises WorkgroupDeleted: The workgroup has been deleted. :raises NotImplementedError: Received an unexpected HTTP response code. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. """ if self.deleted: raise EOFError('Workgroup has been deleted') debug(f"In get_privgroup for {self.name}") # If we cannot see a workgroup's membership, then we won't be able to # see the privgroup, so don't bother making the API call. # NOTE: We still catch 401/403 errors, though, just in case permissions # changed since we first fetched the workgroup. if not self.can_see_membership: raise PermissionError() # Our URL includes the fully-qualified workgroup name, plus `privgroup` url_fragment = ( pathlib.PurePosixPath(self.name) / pathlib.PurePosixPath('privgroup') ) response = self.client.client.session.get( self.client._url( fragment=url_fragment, ), ) # Catch various errors match response.status_code: case 400: response_json = None try: response_json = response.json() except requests.exceptions.JSONDecodeError: pass # Did our workgroup go inactive out from under us? if ( response_json is not None and response_json['notification'] == 'Workgroup is inactive' ): error(f"Already-instanced workgroup {self.name} has been deleted") self._mark_deleted() raise WorkgroupDeleted(self.name) else: # We have a generic 400 error raise ChildProcessError(response.text) case 500: error(f"Upstream API error: {response.text}") raise ChildProcessError(response.text) case 401 | 403: warning(f"Permission error on get {self.name}") raise PermissionError(response.text) case _ if response.status_code != 200: raise NotImplementedError(response.text) # Did we get a response? debug('Got a response!') # Get results, and make containers for holding results results = response.json() administrators: set[PrivgroupEntry] = set() members: set[PrivgroupEntry] = set() # Process the results and return for administrator in results['administrators']: administrators.add(PrivgroupEntry.from_json(administrator)) for member in results['members']: members.add(PrivgroupEntry.from_json(member)) debug(f"Returning {len(administrators)} admins and {len(members)} members") return PrivgroupContents( members=members, administrators=administrators, )
# # "U" methods. # # Handle the last-updated date in its own call. def _reset_last_update(self) -> None: """Reset the "last-updated" date to today. This method is called by anything that updates any part of the workgroup. It's a separate method from :meth:`update` because this can also be called by workgroup-membership and integration classes. """ debug(f"In _reset_last_update for {self.name}") now_utc = datetime.datetime.now(tz=datetime.timezone.utc) now_stanford = now_utc.astimezone(tz=zoneinfo.ZoneInfo(key='America/Los_Angeles')) self._last_update = now_stanford.date() # The non-container properties are all updated with the same type of call. # So, make a method to implement that call. def _update( self, name: Literal[ 'description', 'filter', 'privgroup', 'reusable', 'visibility', ], value: str, ) -> None: """Update a workgroup's property. This is used by the property setters in this class. It sends the requested change to the Workgroup API. The change might fail, due to a permission error or due to the workgroup being deleted from under us. Or the change might succeed, and the response from the Workgroup API will be the full workgroup record (which includes our just-made change). Either way, that sounds a lot like a refresh, so send the response to the refresh-handling code! :param name: The name of the property to change. :param value: The new value for the property. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises NotImplementedError: Received an unexpected HTTP response code. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. """ if self.deleted: raise EOFError('Workgroup has been deleted') # Make the request for the Workgroup. response = self.client.client.session.put( self.client._url( fragment=self.name ), json={ name: value, }, ) # At this point, it's as if we did a refresh, so send off to the # refresh-handling code. return self._handle_refresh(response) # Setters for properties defined in the "R" section. # NOTE: These setters all have the "no-redef" and "attr-defined" checks # ignored in MyPy. This is because MyPy can't handle property setters # being anything other than immediately after the corresponding getter. # See https://github.com/python/mypy/issues/1465 @description.setter # type: ignore[no-redef,attr-defined] def description( self, value: str, ) -> None: """Set the new description. See :py:property:`description` for information on this property. You must be a workgroup administrator to use this property setter. :raise IndexError: The proposed description is too short or too long. :raise ValueError: The proposed description has invalid characters. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. """ if self.deleted: raise EOFError('Workgroup has been deleted') if len(value) == 0: error('Proposed description is too short') raise IndexError('description') if len(value) > 255: raise IndexError(self.name) # Check if the description has nonprintable characters or is non-ASCII try: value.encode('latin1') except UnicodeError: error('Proposed description is not encodable in ISO 8859-1') raise ValueError('description') if not value.isprintable(): error(f"Proposed description has non-printable characters") raise ValueError('description') # Make the change in the API, which triggers a refresh. self._update('description', value) @filter.setter # type: ignore[no-redef,attr-defined] def filter( self, value: str | WorkgroupFilter, ) -> None: """Set the new filter. See :py:property:`filter` for information on this property. You must be a workgroup administrator to use this property setter. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. """ if self.deleted: raise EOFError('Workgroup has been deleted') if isinstance(value, WorkgroupFilter): value_as_enum = value else: value_as_enum = WorkgroupFilter.from_str(value) value_as_str = str(value_as_enum) # Make the change in the API, which triggers a refresh. self._update('filter', value_as_str) @privgroup.setter # type: ignore[no-redef,attr-defined] def privgroup( self, value: bool ) -> None: """Set the new privgroup value. See :py:property:`privgroup` for information on this property. You must be a workgroup administrator to use this property setter. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. """ if self.deleted: raise EOFError('Workgroup has been deleted') if not isinstance(value, bool): raise TypeError(value) if value == True: value_as_str = 'TRUE' else: value_as_str = 'FALSE' # Make the change in the API, which triggers a refresh. self._update('privgroup', value_as_str) @reusable.setter # type: ignore[no-redef,attr-defined] def reusable( self, value: bool, ) -> None: """Set the new reusable value. See :py:property:`reusable` for information on this property. You must be a workgroup administrator to use this property setter. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. """ if self.deleted: raise EOFError('Workgroup has been deleted') if not isinstance(value, bool): raise TypeError(value) if value == True: value_as_str = 'TRUE' else: value_as_str = 'FALSE' # Make the change in the API, which triggers a refresh. self._update('reusable', value_as_str) @visibility.setter # type: ignore[no-redef,attr-defined] def visibility( self, value: WorkgroupVisibility | str, ) -> None: """Set the new visibility. See :py:property:`visibility` for information on this property. You must be a workgroup administrator to use this property setter. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. """ if self.deleted: raise EOFError('Workgroup has been deleted') if isinstance(value, WorkgroupVisibility): value_as_enum = value else: value_as_enum = WorkgroupVisibility.from_str(value) value_as_str = str(value_as_enum) # Make the change in the API, which triggers a refresh. self._update('visibility', value_as_str) # # "D" Methods. #
[docs] def delete(self) -> None: """Delete the workgroup. You must be an administrator in order to delete it. .. warning:: Once a workgroup has been deleted, you may not create a new workgroup with the same name. Stem owners may restore deleted workgroups through the Workgroup Manager web site. .. note:: It is possible that the workgroup was already deleted by someone else. If that happens, we will update the instance accordingly, and then raise a :class:`WorkgroupDeleted` exception. If you don't care about this case, catch the exception and then continue as normal. :raises ChildProcessError: Something went wrong on the server side (a 400 or 500 error was returned). :raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation. :raises WorkgroupDeleted: The workgroup has been deleted unexpectedly. :raises NotImplementedError: Received an unexpected HTTP response code. :raises requests.Timeout: The MaIS Workgroup API did not respond in time. """ if self.deleted: raise EOFError('Workgroup has (already) been deleted') # Make the request to delete the Workgroup. response = self.client.client.session.delete( self.client._url( fragment=self.name ), ) # A success is easy to handle, so we can put everything into one big # match. match response.status_code: case 200: # The deletion worked debug('Got a response!') self._mark_deleted() case 400: response_json = None try: response_json = response.json() except requests.exceptions.JSONDecodeError: pass # Did our workgroup go inactive out from under us? if ( response_json is not None and response_json['notification'] == 'Workgroup is inactive' ): error(f"Already-instanced workgroup {self.name} has been deleted") self._mark_deleted() raise WorkgroupDeleted(self.name) else: # We have a generic 400 error raise ChildProcessError(response.text) case 500: error(f"Upstream API error: {response.text}") raise ChildProcessError(response.text) case 401 | 403: warning(f"Permission error on get {self.name}") raise PermissionError(response.text) case 404: warning(f"Workgroup {self.name} not found") raise KeyError(self.name) case _: raise NotImplementedError(response.text) # All done! return None
# # Support Methods #
[docs] @staticmethod def datestr_to_date( datestr: str ) -> datetime.date: """Convert a Workgroups API date-string to a DateTime Date :param datestr: A string in the form "01-Jan-2020" :returns: A DateTime Date. :raises ValueError: The provided string could not be parsed """ debug(f"Splitting date-string(?) \"{datestr}\"") # Split the string into pieces pieces = datestr.split('-') if len(pieces) != 3: raise ValueError('String did not have three components') # Try converting the day and year. # int() raises a ValueError on problems, so pass that to the caller! day_number = int(pieces[0]) year_number = int(pieces[2]) # Convert the month into a number month_to_number = { 'JAN': 1, 'FEB': 2, 'MAR': 3, 'APR': 4, 'MAY': 5, 'JUN': 6, 'JUL': 7, 'AUG': 8, 'SEP': 9, 'OCT': 10, 'NOV': 11, 'DEC': 12, } month_str_upper = pieces[1].upper() if month_str_upper not in month_to_number: raise ValueError(f"Did not recognize month ''{pieces[1]}") month_number = month_to_number[month_str_upper] # All done! result = datetime.date( year=year_number, month=month_number, day=day_number, ) debug(f"Converted date-string to {result}") return result