# vim: ts=4 sw=4 et
# -*- coding: utf-8 -*-
# © 2021 The Board of Trustees of the Leland Stanford Junior University.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# This file has a ton of references to classes that are defined in the same
# file. Pythons older than 3.14 (which implements PEP 649) cannot handle that
# natively without this import.
# NOTE: At some point in the future, this annodation will be deprecated.
from __future__ import annotations
# Start with stdlib imports
from collections.abc import Callable, MutableMapping
import datetime
import dataclasses
import logging
import requests
from typing import Literal
import urllib.parse
# Finally, do local imports
from stanford.mais.account.account import Account
import stanford.mais.client
# Set up logging
logger = logging.getLogger(__name__)
debug = logger.debug
info = logger.info
warn = logger.warning
# We are the root for this module, so do library-wide logging configuration.
# See https://docs.python.org/3/howto/logging.html#configuring-logging-for-a-library
logging.getLogger('stanford.mais.account').addHandler(logging.NullHandler())
__all__ = (
'AccountClient',
'Account',
)
# Define a type for search results
[docs]
@dataclasses.dataclass(frozen=True, slots=True)
class PartialAccount:
"""Part of a Stanford Account.
This is a "lite" account record. Lite accounts are returned by the
Workgroup API in two cases:
* When you fetch a lite account, instead of a full account (we never do
this).
* When you request a list of accounts that have changed status recently.
The entire contents are read-only.
To get the full account, call the :meth:`~PartialAccount.account` method to
getch a full account record.
"""
sunetid: str
"""
For people, their SUNetID is also their username. For functional accounts,
this is their username. This is the ``id`` key from the API.
.. tip::
This is used as the account's ``uid`` in LDAP.
"""
is_person: bool
"""
If `True`, this account is for a person. If `False`, this account is for a
"functional account". This is the ``type`` key from the API.
"""
is_active: bool
"""
If `True`, this account is active. If the account is for a person, then
assuming the person's kerberos service is not frozen or otherwise blocked,
they are able to use Stanford Login. If the account is a functional
account, then the associated services are active. This does not imply
anything else. This is computed from the ``status`` key from the API.
"""
last_update: datetime.datetime
"""
The timezone-aware datetime when the account was last updated. This is
computed from the ``statusDateStr`` key from the API.
"""
@classmethod
def from_json(
cls,
source: dict[str, str | int],
) -> PartialAccount:
debug(f"Creating PartialAccount for source['type'] {source['id']} ")
# Check sunetid type
if not isinstance(source['id'], str):
raise TypeError('Unexpected type for "id"')
# Is the account for a person, or functional?
account_type = source['type']
if not isinstance(account_type, str):
raise TypeError('Unexpected type for "type"')
if account_type == 'self':
is_person = True
elif account_type == 'functional':
is_person = False
else:
raise NotImplementedError(f"Unexpected account type '{account_type}'")
# Compute last_updated
if not isinstance(source['statusDateStr'], str):
raise TypeError('Unexpected type for "statusDateStr"')
last_update=datetime.datetime.strptime(
source['statusDateStr'],
'%Y-%m-%dT%H:%M:%S.%fZ'
).replace(tzinfo=datetime.timezone.utc)
return cls(
sunetid=source['id'],
is_person=is_person,
is_active=(True if source['status'] == 'active' else False),
last_update=last_update,
)
[docs]
def account(
self,
client: AccountClient,
) -> Account:
"""Fetch the full :class:`Account` for this search result.
This is a convenience method, performing a normal account lookup and
returning the result.
.. warning::
The returned Account may have a different `is_active` or
`last_update` value. Once you call this method, you should stop
using this PartialAccount.
.. note::
The returned Account will have the same `is_person` value as this
partial account.
:param client: The AccountClient to use for the lookup.
:returns: The full :class:`Account` for this PartialAccount's SUNetID.
:raises ChildProcessError: Something went wrong on the server side (a
400 or 500 error was returned).
:raises KeyError: The SUNetID changed between now and when this
PartialAccount was returned in search results. This is extremely
rare.
:raises PermissionError: You did not use a valid certificate, or do not
have permissions to perform the operation. This is also rare, and
means you either changed certificates, or your certificate has been
disabled or expired.
:raises requests.Timeout: The MaIS Workgroup API did not respond in time.
"""
return client.get(self.sunetid)
# Next, define the client class. Then, define the actual account.
[docs]
@dataclasses.dataclass(frozen=True)
class AccountClient():
"""
The :class:`AccountClient` is the second thing you will instantiate when
you want to interact with the MaIS Account API. (The first thing you
instantiate is a :class:`~stanford.mais.client.MAISClient`). Once you have
a :class:`~stanford.mais.client.MAISClient`, you pass it to the
:class:`AccountClient` constructor with this parameter:
:param stanford.mais.client.MAISClient client: The MAIS client to use.
Once you have an :class:`AccountClient` instantiated, you can use
:meth:`get` to fetch an account. For your convenience, instances of this
class also implement ``__getitem__``, so instead of doing…
.. code-block:: default
aclient = AccountClient(...)
lelandjr = client.get('lelandjr')
… you can do …
.. code-block:: default
aclient = AccountClient(...)
lelandjr = client['lelandjr']
Instances also implement :class:`~collections.abc.Container` functionality,
so you can check for account existence like so:
.. code-block:: default
aclient = AccountClient(...)
lelandjr_exists = (True if 'lelandjr' in aclient else False)
Through the use of caching, if you then decide to fetch the account
after confirming its existance, the entry will be served from cache
instead of making a fresh API request.
.. warning::
The existence of an account does not mean it is active!
"""
client: stanford.mais.client.MAISClient
"""A :class:`~stanford.mais.client.MAISClient` instance.
This configures the API endpoint (accessed via ``client.urls['account']``)
and client key/cert to use. It must be provided when calling the class
constructor.
:raises TypeError: A client was not provided.
"""
_cache: MutableMapping[str, Account] = dataclasses.field(repr=False, default_factory=dict)
"""Cache of already-seen accounts.
This cache is used to store :class:`Account` instances already seen by this
client. It speeds up repeated accesses of accounts.
"""
_filters: frozenset[Callable[[Account], bool]] = dataclasses.field(repr=False, default_factory=frozenset)
"""Set of filters to apply on lookups.
The callables in this frozen set are evaluated when ``get()`` is called,
and has found an Account. Each callable will be called with the Account as
the only parameter. If any callable returns a ``False``, then a
``KeyError`` will be raised, as if the lookup failed.
.. note::
If a callable returns ``False``, then there is no guarantee that all
callables will be called before the exception is raised. The only
situation where all callables will be called is when all callables
return ``True``.
"""
def __post_init__(self) -> None:
"""Check provided constructor variables.
This checks the provided client, and (if needed) sets up the Requests
session.
:raises TypeError: A client was not provided.
"""
# Check the client type
if not isinstance(self.client, stanford.mais.client.MAISClient):
raise TypeError('client')
# That's it!
return None
[docs]
def get(
self,
sunetid: str,
) -> Account:
"""Fetch an Account.
This is a convenience wrapper around :meth:`Account.get`. All other
parameters provided are passed through to :meth:`~Account.get`, and the
resulting instance is returned.
Refer to :meth:`Account.get` for details on parameters, exceptions,
etc..
"""
# Fetch our account
account = Account.get(
client=self,
sunetid=sunetid,
)
# Check against filters
for filter_func in self._filters:
if filter_func(account) is False:
raise KeyError(sunetid)
# All filters passed!
return account
def __getitem__(
self,
item: str,
) -> Account:
"""Fetch an Account.
This works exactly like :meth:`get`. See :meth:`get` and
:meth:`Account.get` for more information.
"""
return self.get(item)
def __contains__(
self,
sunetid: str
) -> bool:
"""Check for SUNetID existance.
:param sunid: The account ID to check for existence.
:return: `True` if the account exists, else `False`.
"""
try:
self.get(sunetid)
return True
except KeyError:
return False
[docs]
def clear_cache(
self,
) -> None:
"""Clear cache of accounts.
As mentioned in the class docs, visited accounts are cached locally,
for speed and to reduce load on the Account API. Although accounts
rarely change, in long-running
programs, this can be a problem. To assist, this method clears the
cache of this specific Account client.
.. danger::
If you are holding a reference to an existing :class:`Account`,
or to one of the the Account's services, clearing the cache does
not invalidate those references!
This method should not be called unless you know what you are
doing.
"""
debug('in clear_cache')
self._cache.clear()
# Now, let's create some AccountViews!!!
[docs]
def only_active(
self,
) -> AccountClient:
"""Create a modified :class:`AccountClient` that can only see active
accounts.
The returned client instance has been modified so that
:meth:`~AccountClient.get` only returns active accounts. If you
try to look up an inactive account, :meth:`~AccountClient.get` will
act as if the account does not exist.
As many Account API consumers are only interested in active accounts,
you may find this to be very convenient. If this interests you, you
can do something like this:
.. code-block:: default
import stanford.mais.client
from stanford.mais.client.account import AccountClient
api_client = stanford.mais.client.MAISClient(...)
active_accounts = AccountClient(api_client).only_active()
.. tip::
You can stack these filters. For example,
``AccountClient(api_client).only_active().only_people()`` will only
be able to "see" active people accounts (ignoring active functional
accounts).
.. warning::
The 'client' returned by this method uses the same caches as this
client. Therefore, it must not be used across threads/processes.
"""
new_filter = lambda candidate: (True if candidate.is_active else False)
return AccountClient(
client=self.client,
_cache=self._cache,
_filters=self._filters | frozenset((new_filter,))
)
[docs]
def only_inactive(
self,
) -> AccountClient:
"""Create a modified :class:`AccountClient` that can only see inactive
accounts.
The returned client instance has been modified so that
:meth:`~AccountClient.get` only returns inactive accounts. If you
try to look up an active account, :meth:`~AccountClient.get` will
act as if the account does not exist.
See :meth:`~AccountClient.only_active` for examples, tips, and
warnings.
"""
new_filter = lambda candidate: (False if candidate.is_active else True)
return AccountClient(
client=self.client,
_cache=self._cache,
_filters=self._filters | frozenset((new_filter,))
)
[docs]
def only_people(
self,
) -> AccountClient:
"""Create a modified :class:`AccountClient` that can only see
accounts of people.
The returned client instance has been modified so that
:meth:`~AccountClient.get` only returns the accounts of people. If you
try to look up a functional account, :meth:`~AccountClient.get` will
act as if the account does not exist.
As many Account API consumers are only interested in SUNetIDs,
you may find this to be very convenient. If this interests you, you
can do something like this:
.. code-block:: default
import stanford.mais.client
from stanford.mais.client.account import AccountClient
api_client = stanford.mais.client.MAISClient(...)
sunetids = AccountClient(api_client).only_people()
And, if you only care about active SUNetIDs, you can chain them
together, like this:
.. code-block:: default
import stanford.mais.client
from stanford.mais.client.account import AccountClient
api_client = stanford.mais.client.MAISClient(...)
active_sunetids = AccountClient(api_client).only_active().only_people()
.. warning::
The 'client' returned by this method uses the same caches as this
client. Therefore, it must not be used across threads/processes.
"""
new_filter = lambda candidate: (True if candidate.is_person else False)
return AccountClient(
client=self.client,
_cache=self._cache,
_filters=self._filters | frozenset((new_filter,))
)
[docs]
def only_functional(
self,
) -> AccountClient:
"""Create a modified :class:`AccountClient` that can only see
functional accounts.
The returned client instance has been modified so that
:meth:`~AccountClient.get` only returns functional accounts. If you
try to look up a person's account (a SUNetID),
:meth:`~AccountClient.get` will act as if the account does not exist.
See :meth:`~AccountClient.only_people` for examples, tips, and
warnings.
"""
new_filter = lambda candidate: (False if candidate.is_person else True)
return AccountClient(
client=self.client,
_cache=self._cache,
_filters=self._filters | frozenset((new_filter,))
)
# Accounts changed status in the last X days
[docs]
def get_changed_status(
self,
days: int,
current_status: Literal['active', 'inactive', 'pending'],
get_people: bool = True,
) -> frozenset[PartialAccount]:
"""Search for accounts which have recently changed status.
Search for all accounts that have changed status within the specified number
of days. This is commonly used to get a list of accounts that have
changed status
.. warning::
This call **only** includes accounts which have changed status.
Changes to other attributes—like a person's name, or a service
setting like email aliases—will not cause an account to be included
in these results.
.. warning::
The search results will tell you which accounts changed status, but
they will not tell you the old status.
.. note::
This method is not affected by the modified AccountClient instances
returned by :meth:`only_active`, :meth:`only_inactive`,
:meth:`only_people`, and :meth:`only_functional`.
:param days: Include accounts that changed status within this many days.
Must be at least 1, and at most 30.
:param current_status: Only accounts with this current status will be
included in the results.
The status `pending` refers to an account which has just been
created and is not yet active. It is only used at the very
beginning of an account's life cycle.
.. note::
Only one `current_status` value may be provided. If you want a
list of *all* accounts which have changed status, run this search
multiple times (once for each status), and perform a set union
on the results.
.. warning::
It is possible for an account to change status multiple times
during your search period. For example, if you search for
`current_status` ``active``, your search results might include
accounts which were already active, went inactive, and then
became active again.
:param get_people: If True (which is the defaut), only accounts for
people will be included in the results. If you instead want
results for functional accounts, change this to ``False``.
.. note::
The search can return either people accounts or functional
accounts, not both. If you want both, then make two searches
and perform a set union on the results.
:raises ValueError: You provided an invalid `days`.
:raises PermissionError: You did not use a valid certificate, or do not have permissions to perform the operation.
:raises NotImplementedError: An unexpected HTTP response was received.
:raises requests.Timeout: The MaIS Workgroup API did not respond in time.
"""
# Check days
if days < 1:
raise ValueError('days must be at least 1')
if days > 30:
raise ValueError('days must be at most 30')
# Get the Requests session
session = self.client.session
# Do the search.
query = {
'type': ('self' if get_people is True else 'functional'),
'status': current_status,
'statusdays': str(days),
}
info(f"Fetching all {query['type']} changed to {current_status} in the last {days} days…")
response = session.get(
urllib.parse.urljoin(
self.client.urls['account'],
'?' + urllib.parse.urlencode(query),
),
)
# Catch a number of bad errors.
debug(f"Status code is {response.status_code}")
match response.status_code:
case 400 | 500:
raise ChildProcessError(response.text)
case 401 | 403:
raise PermissionError(response.text)
case _ if response.status_code != 200:
raise NotImplementedError(response.text)
# Decode the JSON
info('Parsing response JSON')
response_json = response.json()
# Make our results
return frozenset(list(
(PartialAccount.from_json(result) for result in response_json)
))