Les APIs REST permettent aux applications de communiquer via HTTP. Python excelle
pour consommer des APIs (avec requests) et en créer (avec Flask/FastAPI).
Maîtrisez GET, POST, PUT, DELETE, authentification et gestion d'erreurs.
Consommer une API
GET - Récupérer données
import requests
# GET simple
response = requests.get('https://api.example.com/users')
# Vérifier statut
if response.status_code == 200:
data = response.json() # Parser JSON automatiquement
print(data)
else:
print(f"Erreur: {response.status_code}")
# Paramètres query string
params = {
'page': 2,
'limit': 10,
'sort': 'name'
}
response = requests.get('https://api.example.com/users', params=params)
# URL finale: https://api.example.com/users?page=2&limit=10&sort=name
# Headers personnalisés
headers = {
'Accept': 'application/json',
'User-Agent': 'MonApp/1.0'
}
response = requests.get(url, headers=headers)
# Récupérer ressource spécifique
user_id = 123
response = requests.get(f'https://api.example.com/users/{user_id}')
POST - Créer ressource
# Données JSON
nouveau_user = {
'name': 'Nicolas Lema',
'email': 'nicolas@example.com',
'role': 'developer'
}
response = requests.post(
'https://api.example.com/users',
json=nouveau_user # Convertit auto en JSON et ajoute Content-Type
)
if response.status_code == 201:
user_cree = response.json()
print(f"User créé avec ID: {user_cree['id']}")
else:
print(f"Erreur: {response.status_code}")
print(response.json()) # Message d'erreur
# Formulaire (form-data)
data = {
'username': 'nicolas',
'password': 'secret123'
}
response = requests.post(url, data=data)
# Fichier upload
files = {'file': open('document.pdf', 'rb')}
response = requests.post(url, files=files)
PUT / PATCH - Modifier
# PUT - Remplacement complet
user_modifie = {
'name': 'Nicolas Lema Alves',
'email': 'nicolas.lema@example.com',
'role': 'senior developer'
}
response = requests.put(
'https://api.example.com/users/123',
json=user_modifie
)
# PATCH - Modification partielle (seulement champs modifiés)
modifications = {
'email': 'nouveau@example.com'
}
response = requests.patch(
'https://api.example.com/users/123',
json=modifications
)
DELETE - Supprimer
response = requests.delete('https://api.example.com/users/123')
if response.status_code == 204:
print("User supprimé")
elif response.status_code == 404:
print("User inexistant")
else:
print(f"Erreur: {response.status_code}")
Authentification
API Key
# Dans header
headers = {
'Authorization': 'Bearer YOUR_API_KEY',
# OU
'X-API-Key': 'YOUR_API_KEY'
}
response = requests.get(url, headers=headers)
# Dans query string
params = {'api_key': 'YOUR_API_KEY'}
response = requests.get(url, params=params)
# Utiliser variables d'environnement
import os
from dotenv import load_dotenv
load_dotenv() # Charge .env
API_KEY = os.getenv('API_KEY')
headers = {'Authorization': f'Bearer {API_KEY}'}
Basic Auth
from requests.auth import HTTPBasicAuth
response = requests.get(
url,
auth=HTTPBasicAuth('username', 'password')
)
# Ou raccourci
response = requests.get(url, auth=('username', 'password'))
OAuth 2.0
from requests_oauthlib import OAuth2Session
# Configuration
client_id = 'YOUR_CLIENT_ID'
client_secret = 'YOUR_CLIENT_SECRET'
token_url = 'https://api.example.com/oauth/token'
# Obtenir token
oauth = OAuth2Session(client_id)
token = oauth.fetch_token(
token_url,
client_secret=client_secret,
username='user',
password='pass'
)
# Utiliser token
headers = {'Authorization': f"Bearer {token['access_token']}"}
response = requests.get(url, headers=headers)
Gestion d'Erreurs
import requests
from requests.exceptions import RequestException, Timeout, HTTPError
def appel_api_securise(url, max_retries=3):
"""Appel API avec gestion erreurs et retry"""
for tentative in range(max_retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status() # Lève HTTPError si 4xx ou 5xx
return response.json()
except Timeout:
print(f"Timeout (tentative {tentative + 1}/{max_retries})")
if tentative == max_retries - 1:
raise
except HTTPError as e:
if response.status_code == 404:
print("Ressource introuvable")
return None
elif response.status_code == 429:
print("Rate limit - attente 60s")
time.sleep(60)
continue
else:
print(f"Erreur HTTP: {e}")
raise
except RequestException as e:
print(f"Erreur requête: {e}")
raise
return None
# Utilisation
try:
data = appel_api_securise('https://api.example.com/data')
if data:
print(data)
except Exception as e:
print(f"Échec final: {e}")
Exemples Pratiques
Client GitHub API
import requests
from datetime import datetime
class GitHubClient:
def __init__(self, token):
self.base_url = 'https://api.github.com'
self.headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}
def get_user_repos(self, username):
"""Récupère repos d'un utilisateur"""
url = f'{self.base_url}/users/{username}/repos'
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
repos = response.json()
return [{
'name': r['name'],
'stars': r['stargazers_count'],
'language': r['language'],
'url': r['html_url']
} for r in repos]
return []
def create_issue(self, owner, repo, titre, description):
"""Crée une issue"""
url = f'{self.base_url}/repos/{owner}/{repo}/issues'
data = {
'title': titre,
'body': description,
'labels': ['bug']
}
response = requests.post(url, json=data, headers=self.headers)
if response.status_code == 201:
issue = response.json()
return f"Issue créée: {issue['html_url']}"
return f"Erreur: {response.status_code}"
# Utilisation
client = GitHubClient('YOUR_TOKEN')
repos = client.get_user_repos('nicolaslema')
for repo in repos:
print(f"{repo['name']}: {repo['stars']} ⭐")
Récupérer météo
def get_meteo(ville, api_key):
"""Récupère météo via OpenWeatherMap"""
url = 'https://api.openweathermap.org/data/2.5/weather'
params = {
'q': ville,
'appid': api_key,
'units': 'metric',
'lang': 'fr'
}
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
return {
'ville': data['name'],
'temperature': data['main']['temp'],
'ressenti': data['main']['feels_like'],
'description': data['weather'][0]['description'],
'humidite': data['main']['humidity'],
'vent': data['wind']['speed']
}
return None
# Utilisation
meteo = get_meteo('Lausanne', 'YOUR_API_KEY')
if meteo:
print(f"{meteo['ville']}: {meteo['temperature']}°C")
print(f"Ressenti: {meteo['ressenti']}°C")
print(f"Description: {meteo['description']}")
Wrapper API complet
import requests
import json
class APIClient:
"""Client API réutilisable"""
def __init__(self, base_url, api_key=None):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
if api_key:
self.session.headers.update({
'Authorization': f'Bearer {api_key}'
})
self.session.headers.update({
'Content-Type': 'application/json',
'Accept': 'application/json'
})
def _request(self, method, endpoint, **kwargs):
"""Méthode interne pour toutes les requêtes"""
url = f'{self.base_url}/{endpoint.lstrip("/")}'
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
if response.content:
return response.json()
return None
except requests.exceptions.HTTPError as e:
print(f"Erreur HTTP: {e}")
if response.content:
print(response.json())
raise
def get(self, endpoint, params=None):
return self._request('GET', endpoint, params=params)
def post(self, endpoint, data=None):
return self._request('POST', endpoint, json=data)
def put(self, endpoint, data=None):
return self._request('PUT', endpoint, json=data)
def delete(self, endpoint):
return self._request('DELETE', endpoint)
# Utilisation
api = APIClient('https://api.example.com', api_key='YOUR_KEY')
# GET
users = api.get('/users', params={'page': 1})
# POST
new_user = api.post('/users', data={'name': 'Nicolas'})
# PUT
updated = api.put('/users/123', data={'email': 'new@email.com'})
# DELETE
api.delete('/users/123')
Gérer Rate Limiting
import time
from functools import wraps
class RateLimiter:
"""Rate limiter simple"""
def __init__(self, max_calls, period):
self.max_calls = max_calls
self.period = period
self.calls = []
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
now = time.time()
# Nettoyer appels anciens
self.calls = [c for c in self.calls if now - c < self.period]
# Vérifier limite
if len(self.calls) >= self.max_calls:
sleep_time = self.period - (now - self.calls[0])
print(f"Rate limit - attente {sleep_time:.1f}s")
time.sleep(sleep_time)
self.calls = []
self.calls.append(now)
return func(*args, **kwargs)
return wrapper
# Utilisation: max 10 appels par minute
@RateLimiter(max_calls=10, period=60)
def appeler_api(url):
response = requests.get(url)
return response.json()
# Les appels seront automatiquement limités
for i in range(20):
data = appeler_api(f'https://api.example.com/data/{i}')