client.py 12.7 KB
Newer Older
1
2
3
"""Client for connecting to CIM API"""
import logging
import urllib.parse
Trond Aasan's avatar
Trond Aasan committed
4
from typing import Tuple, Union, List, Iterable, Set, Any, Dict, Optional
5
6
7

import requests

8
from cim_client.models import (
Trond Aasan's avatar
Trond Aasan committed
9
    BaseModel,
10
11
12
    Organisation,
    OrganisationImportResult,
    OrganisationList,
Trond Aasan's avatar
Trond Aasan committed
13
    PersonBase,
14
    PersonList,
Trond Aasan's avatar
Trond Aasan committed
15
    PersonBase_T,
16
)
17
18
19
20

logger = logging.getLogger(__name__)


Trond Aasan's avatar
Trond Aasan committed
21
22
23
JsonType = Any


Trond Aasan's avatar
Trond Aasan committed
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def _serialise_upsert_person(
    person: PersonBase_T,
) -> Dict[str, Any]:
    """
    Make sure empty phone numbers are excluded.

    A phone field is reset when the corresponding key is omitted.
    Neither "" nor null are allowed values.
    """
    empty_phone_fields = {
        f
        for f in {
            "job_mobile",
            "job_phone",
            "secret_number",
            "private_mobile",
            "private_phone",
        }
        if not person.__dict__[f]
    }

    return person.dict(exclude=empty_phone_fields or None)  # type: ignore[arg-type]


Trond Aasan's avatar
Trond Aasan committed
48
def merge_dicts(*dicts: Optional[Dict[Any, Any]]) -> Dict[Any, Any]:
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
    """
    Combine a series of dicts without mutating any of them.

    >>> merge_dicts({'a': 1}, {'b': 2})
    {'a': 1, 'b': 2}
    >>> merge_dicts({'a': 1}, {'a': 2})
    {'a': 2}
    >>> merge_dicts(None, None, None)
    {}
    """
    combined = dict()
    for d in dicts:
        if not d:
            continue
        for k in d:
            combined[k] = d[k]
    return combined


class CimEndpoints:
Trond Aasan's avatar
Trond Aasan committed
69
70
    def __init__(
        self,
Trond Aasan's avatar
Trond Aasan committed
71
        url: str,
72
73
        upsert_persons_url: Optional[str] = None,
        delete_persons_url: Optional[str] = None,
Trond Aasan's avatar
Trond Aasan committed
74
        import_organisations_url: Optional[str] = None,
Trond Aasan's avatar
Trond Aasan committed
75
    ):
76
        """ Get endpoints relative to the CIM API URL. """
77
        self.baseurl = url
78
79
        self.upsert_persons_url = upsert_persons_url or "?ws=contacts/upsert/1.0"
        self.delete_persons_url = delete_persons_url or "?ws=contacts/delete/1.0"
80
        self.import_organisations_url = (
81
            import_organisations_url or "?ws=contacts/organisations/1.0"
82
        )
83

Trond Aasan's avatar
Trond Aasan committed
84
    def __repr__(self) -> str:
Trond Aasan's avatar
Trond Aasan committed
85
        return "{cls.__name__}({url!r})".format(cls=type(self), url=self.baseurl)
86

87
88
    def upsert_persons(self) -> str:
        return urllib.parse.urljoin(self.baseurl, self.upsert_persons_url)
89

90
91
    def delete_persons(self) -> str:
        return urllib.parse.urljoin(self.baseurl, self.delete_persons_url)
92

93
94
    def get_upsert_persons_schema(self) -> str:
        return urllib.parse.urljoin(self.baseurl, self.upsert_persons_url)
95

96
97
    def get_delete_persons_schema(self) -> str:
        return urllib.parse.urljoin(self.baseurl, self.delete_persons_url)
98

Trond Aasan's avatar
Trond Aasan committed
99
    def import_organisations(self) -> str:
100
101
        return urllib.parse.urljoin(self.baseurl, self.import_organisations_url)

Trond Aasan's avatar
Trond Aasan committed
102
    def get_import_organisations_schema(self) -> str:
103
104
        return urllib.parse.urljoin(self.baseurl, self.import_organisations_url)

105
106
107

class CimClient(object):
    default_headers = {
Trond Aasan's avatar
Trond Aasan committed
108
        "Accept": "application/json",
109
110
    }

Trond Aasan's avatar
Trond Aasan committed
111
112
    def __init__(
        self,
Trond Aasan's avatar
Trond Aasan committed
113
114
        endpoints: Union[CimEndpoints, Dict[str, str]],
        headers: Union[None, Dict[str, str]] = None,
Trond Aasan's avatar
Trond Aasan committed
115
116
        return_objects: bool = True,
        use_sessions: bool = True,
Trond Aasan's avatar
Trond Aasan committed
117
        auth: Optional[Tuple[str, str]] = None,
Trond Aasan's avatar
Trond Aasan committed
118
    ):
119
120
        """
        CIM API client.
Trond Aasan's avatar
Trond Aasan committed
121
        :param CimEndpoints endpoints: API endpoints
122
123
124
        :param dict headers: Append extra headers to all requests
        :param bool return_objects: Return objects instead of raw JSON
        :param bool use_sessions: Keep HTTP connections alive (default True)
125
        :param tuple auth: (username, password) for simple http auth
126
        """
Trond Aasan's avatar
Trond Aasan committed
127
128
        if isinstance(endpoints, dict):
            endpoints = CimEndpoints(**endpoints)
129

Trond Aasan's avatar
Trond Aasan committed
130
        self.endpoints = endpoints
Trond Aasan's avatar
Trond Aasan committed
131
132
133

        if isinstance(auth, list):
            auth = tuple(auth)
134
        self.auth = auth if auth else None
Trond Aasan's avatar
Trond Aasan committed
135

136
137
138
139
140
        self.headers = merge_dicts(self.default_headers, headers)
        self.return_objects = return_objects
        if use_sessions:
            self.session = requests.Session()
        else:
Trond Aasan's avatar
Trond Aasan committed
141
            self.session = requests  # type: ignore
142

Trond Aasan's avatar
Trond Aasan committed
143
    def _build_request_headers(self, headers: Dict[str, str]) -> Dict[str, str]:
144
145
146
        request_headers = {}
        for h in self.headers:
            request_headers[h] = self.headers[h]
Trond Aasan's avatar
Trond Aasan committed
147
        for h in headers or ():
148
149
150
            request_headers[h] = headers[h]
        return request_headers

Trond Aasan's avatar
Trond Aasan committed
151
152
    def call(
        self,
Trond Aasan's avatar
Trond Aasan committed
153
154
155
156
157
158
159
160
        method_name: str,
        url: str,
        headers: Optional[Dict[str, str]] = None,
        params: Optional[Dict[str, str]] = None,
        return_response: bool = True,
        **kwargs: Any
    ) -> Union[requests.Response, JsonType]:
        headers = self._build_request_headers(headers or {})
161
162
        if params is None:
            params = {}
Trond Aasan's avatar
Trond Aasan committed
163
164
165
166
167
168
169
170
171
        logger.debug(
            "Calling %s %s with params=%r",
            method_name,
            urllib.parse.urlparse(url).path,
            params,
        )
        r = self.session.request(
            method_name, url, headers=headers, params=params, **kwargs
        )
172
        if r.status_code in (500, 400, 401):
Trond Aasan's avatar
Trond Aasan committed
173
            logger.warning("Got HTTP %d: %r", r.status_code, r.content)
174
175
176
177
178
        if return_response:
            return r
        r.raise_for_status()
        return r.json()

Trond Aasan's avatar
Trond Aasan committed
179
    def get(self, url: str, **kwargs: Any) -> Union[requests.Response, JsonType]:
Trond Aasan's avatar
Trond Aasan committed
180
        return self.call("GET", url, **kwargs)
181

Trond Aasan's avatar
Trond Aasan committed
182
    def put(self, url: str, **kwargs: Any) -> Union[requests.Response, JsonType]:
Trond Aasan's avatar
Trond Aasan committed
183
        return self.call("PUT", url, **kwargs)
184

Trond Aasan's avatar
Trond Aasan committed
185
    def post(self, url: str, **kwargs: Any) -> Union[requests.Response, JsonType]:
Trond Aasan's avatar
Trond Aasan committed
186
        return self.call("POST", url, **kwargs)
187

Trond Aasan's avatar
Trond Aasan committed
188
189
190
    def object_or_data(
        self, cls: BaseModel, data: JsonType
    ) -> Union[BaseModel, JsonType]:
191
192
193
194
        if not self.return_objects:
            return data
        return cls.from_dict(data)

195
    def get_upsert_persons_schema(self) -> JsonType:
196
197
198
199
200
201
202
203
        """GETs the current schema from CIM

        The schema can change depending on the installation.

        It may be a good idea to setup some sort of daily(?)cron job that
        checks whether the schema has changed lately so you don't update
        using an outdated schema.
        """
204
        url = self.endpoints.get_upsert_persons_schema()
Trond Aasan's avatar
Trond Aasan committed
205
        response = self.get(
206
207
            url,
            auth=self.auth,
Andreas Ellewsen's avatar
Andreas Ellewsen committed
208
            return_response=False,
Trond Aasan's avatar
Trond Aasan committed
209
        )
Andreas Ellewsen's avatar
Andreas Ellewsen committed
210
        return response
211

212
    def get_delete_persons_schema(self) -> JsonType:
213
214
215
216
217
218
219
220
        """GETs the current schema from CIM

        The schema can change depending on the installation.

        It may be a good idea to setup some sort of daily(?)cron job that
        checks whether the schema has changed lately so you don't update
        using an outdated schema.
        """
221
        url = self.endpoints.get_delete_persons_schema()
Trond Aasan's avatar
Trond Aasan committed
222
        response = self.get(
223
224
            url,
            auth=self.auth,
Andreas Ellewsen's avatar
Andreas Ellewsen committed
225
            return_response=False,
Trond Aasan's avatar
Trond Aasan committed
226
        )
Andreas Ellewsen's avatar
Andreas Ellewsen committed
227
        return response
228

229
    def upsert_persons(
Trond Aasan's avatar
Trond Aasan committed
230
        self,
Trond Aasan's avatar
Trond Aasan committed
231
        persondata: Union[List[PersonBase], PersonBase, PersonList],
Trond Aasan's avatar
Trond Aasan committed
232
    ) -> Optional[Tuple[str, bytes]]:
Trond Aasan's avatar
Trond Aasan committed
233
        """Update one or more persons
234
235
236
237
238
239
240
241

        Note that updating more than one person at a time may require parsing
        the content of the response.
        The reason behind this is that you will not get a 404 if at least one
        person in the list was found.
        Note also that the response will not contain info about failing to find
        someone. You will have to compare with what you posted.
        """
242
        url = self.endpoints.upsert_persons()
243

Trond Aasan's avatar
Trond Aasan committed
244
245
246
247
248
        if isinstance(persondata, PersonList):
            persondata = persondata.persons
        elif isinstance(persondata, PersonBase):
            persondata = [persondata]
        elif not isinstance(persondata, list):
Trond Aasan's avatar
Trond Aasan committed
249
            raise TypeError("persondata must be List of Persons or Person")
250

Trond Aasan's avatar
Trond Aasan committed
251
252
253
        data = [_serialise_upsert_person(x) for x in persondata]
        headers = {"Content-Type": "application/json"}
        response = self.post(url, json=data, auth=self.auth, headers=headers)
254
255
256
257
        if response.status_code == 200:
            logger.debug("import success, response was: %s", response.content)
            return "Import success", response.content
        response.raise_for_status()
Trond Aasan's avatar
Trond Aasan committed
258
        return None
259

260
    def delete_persons(
Trond Aasan's avatar
Trond Aasan committed
261
        self,
Trond Aasan's avatar
Trond Aasan committed
262
        person_ids: List[str],
Trond Aasan's avatar
Trond Aasan committed
263
    ) -> Union[Tuple[str, bytes], None]:
264
265
        """Convenience method for deletion

Trond Aasan's avatar
Trond Aasan committed
266
267
268
269
        The request body should contain an array of unique identifiers
        for the users or contacts you want to delete. You can find out
        which property is used as a unique identifier in the installation
        by generating a JSON schema.
270

Trond Aasan's avatar
Trond Aasan committed
271
        :param person_ids: The person or people we want to delete from CIM
272
273
        :return: String describing status or None if not found
        """
274
        url = self.endpoints.delete_persons()
Trond Aasan's avatar
Trond Aasan committed
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
        headers = {"Content-Type": "application/json"}
        response = self.post(url, json=person_ids, auth=self.auth, headers=headers)

        if (
            response.status_code == 404
            and response.text == "No person found: No deletion"
        ):
            logger.warning(
                "No persons were deleted, response was: %s", response.content
            )
            return None

        if response.status_code == 200:
            logger.debug("Delete success, response was: %s", response.content)
            return "Delete success", response.content

        response.raise_for_status()
        return None
293

Trond Aasan's avatar
Trond Aasan committed
294
    def get_import_organisations_schema(self) -> JsonType:
295
296
297
298
299
300
301
302
303
304
        """GETs the current schema from CIM

        The schema can change depending on the installation.

        It may be a good idea to setup some sort of daily(?)cron job that
        checks whether the schema has changed lately so you don't update
        using an outdated schema.
        """
        url = self.endpoints.get_import_organisations_schema()
        response = self.get(
305
306
            url,
            auth=self.auth,
Andreas Ellewsen's avatar
Andreas Ellewsen committed
307
            return_response=False,
308
        )
Andreas Ellewsen's avatar
Andreas Ellewsen committed
309
        return response
310
311

    def import_organisations(
Trond Aasan's avatar
Trond Aasan committed
312
        self, organisations: Union[Iterable[Organisation], OrganisationList]
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
    ) -> Union[bool, OrganisationImportResult]:
        """Import organisation tree into CIM

        :param organisations: The list of organisations to import
        :return: OrganisationImportResult or ['No changes'] if no changes

        From CIM API documentation:

        NB! All imported organization units must be present at all times in the
        import to prevent deletion.

        NB! Note that the “parent” property of each organization unit in the
        import data should contain a valid organization key or be left empty. If
        “parent” contains a non-valid key at first-time import, the unit and its
        subordinate units will not be imported. If “parent” contains a non-valid
        key when updates are made to an existing unit, the unit and all its
        subordinate units will be deleted.

        Result:

        The import resulted in several organisations/departments being inserted:

            {
              "insert":["ORG-123","ORG-124"],
              "update":["ORG-121","ORG-21","ORG-211"],
              "delete":["ORG-121w"]
            }

        NB! If an organisation had been deleted as a result of a previous
        import, and was afterwards restored via import, the result will be sent
        as “update”.

        No changes were made as a result of the import:

            ["No changes"]
        """
        if not isinstance(organisations, OrganisationList):
            organisations = OrganisationList(__root__=list(organisations))  # noqa

        # Api doesn't give any feedback on unknown parents, log them
        unknown_parents = _find_unknown_parents(organisations.__root__)
        if unknown_parents:
            logger.warning("%s keys not found", len(unknown_parents))
            logger.debug("Unknown keys: %s", unknown_parents)

        url = self.endpoints.import_organisations()
Andreas Ellewsen's avatar
Andreas Ellewsen committed
359
        result: JsonType = self.post(
360
            url,
Trond Aasan's avatar
Trond Aasan committed
361
            json=organisations.dict()["__root__"],
362
            auth=self.auth,
Andreas Ellewsen's avatar
Andreas Ellewsen committed
363
            return_response=False,
364
365
366
367
368
        )

        if isinstance(result, dict):
            return OrganisationImportResult.from_dict(result)

Trond Aasan's avatar
Trond Aasan committed
369
        if result != ["No changes"]:
Trond Aasan's avatar
Trond Aasan committed
370
            logger.warning("Unrecognized result: %s", result)
371
372
373
374
375

        return result


def _find_unknown_parents(organisations: List[Organisation]) -> Set[str]:
Trond Aasan's avatar
Trond Aasan committed
376
    keys = {x.key for x in organisations}
Trond Aasan's avatar
Trond Aasan committed
377
    parent_keys = set(
Trond Aasan's avatar
Trond Aasan committed
378
        filter(lambda x: bool(x), map(lambda x: x.parent_key, organisations))
Trond Aasan's avatar
Trond Aasan committed
379
    )
380
381

    if not parent_keys.issubset(keys):
Trond Aasan's avatar
Trond Aasan committed
382
        return parent_keys - keys
383
384
385

    return set()

386

Trond Aasan's avatar
Trond Aasan committed
387
def get_client(config_dict: JsonType) -> CimClient:
388
389
390
391
    """
    Get a CimClient from configuration.
    """
    return CimClient(**config_dict)