Refactor and enhance timelapse capture system

- Removed obsolete script `script_SANSDEMARRAGE.sh`.
- Added new `Camera.py` and `Connexion.py` files for camera handling and socket communication.
- Implemented `First_Try.py` for initial camera preview testing.
- Created `Humidity.py` for humidity sensor data acquisition.
- Developed `Send_data_stocked.py` for managing and sending stored data.
- Introduced `Time_Lapse_Connection.py` and `Time_Lapse_NoConnection.py` for connected and offline modes.
- Added `get_from_server.py` for retrieving camera status from the server.
- Updated `sync_offline_data.py` for synchronizing offline data.
- Created `timelapse.service` for managing the timelapse service on Raspberry Pi.
- Established package structure with `__init__.py` and `api_client.py` for API interactions.
- Enhanced `capture.py` for managing image captures and data storage.
- Configured `config.py` for centralized configuration management.
- Developed `sensors.py` for handling environmental sensors and camera operations.
- Implemented `timelapse_offline.py` and `timelapse_online.py` for capturing images in offline and online modes.
This commit is contained in:
2025-04-27 16:45:00 +02:00
parent bebfb8adb3
commit 610d220c3f
20 changed files with 951 additions and 152 deletions

View File

@@ -1,148 +1,72 @@
import time
from datetime import datetime
import picamera2 as pc
import smbus2
#!/usr/bin/env python3
# coding: utf-8
"""
Script d'automatisation pour la gestion du système timelapse
Ce script vérifie le statut du serveur et configure le système en conséquence.
"""
import os
import json
import shutil
import requests
import sys
import time
import logging
import subprocess
from timelapse.config import config
from timelapse.api_client import api_client
from timelapse.sensors import micro_controller
class MicroControler:
def __init__(self):
pass
def set_data(self, t):
bus = smbus2.SMBus(1)
ans = bus.write_byte(0x28, t)
#print(ans)
time.sleep(0.015)
ans=smbus2.i2c_msg.read(0x28,3)
print("I get that : ",ans)
def main():
"""
Fonction principale d'automatisation
"""
logging.info("==================== AUTOMATISATION TIMELAPSE ====================")
bus.i2c_rdwr(ans)
data = list(ans)
print(data)
try:
# Récupérer le statut de la caméra depuis l'API
camera_status = api_client.get_camera_status()
def set_data_2_octets(self, value):
value_16b = format(value, "016b")
print(value_16b)
#print(value_16b>>8)
high_address = (value>>8) & 0xFF
low_address = value & 0xFF
print(high_address)
print(low_address)
"""High Address"""
self.set_data(high_address)
"""Low Address"""
self.set_data(low_address)
class Server:
def __init__(self):
self.url_requete = "https://timelapse.kerboul.me/api/camera/status"
self.dic = { "set_config":False,
"maintenance": False,
"stop current config":False,
"timelapse":3,
"conf nb_images":1,
"nb_images restantes":1
}
def get_request(self):
try:
response = requests.get(self.url_requete)
print("Here is the answer from the server : ",response.json())
if response.status_code == 200:
camera_status = response.json()
#print(camera_status)
self.dic["maintenance"] = camera_status["maintenance"]
self.dic["timelapse"] = camera_status["interval"]
self.dic["conf nb_images"] = camera_status["nb_images"]
self.dic["set_config"] = camera_status["idle"]
self.dic["stop current config"] = camera_status["stop_flag"]
else:
print("mauvais code")
print(response.status_code)
except requests.exceptions.RequestException as e:
print("erreur API ou internet")
if camera_status is None:
logging.error("Impossible d'obtenir le statut de la caméra depuis l'API")
return
def get_dic_data(self):
return self.dic
def create_Json(self):
self.filename = "/home/timelapse/Documents/Time_Lapse/CONFIG/config.json"
with open(self.filename, "w") as file:
json.dump(self.get_dic_data(), file)
# Mettre à jour la configuration locale
api_client.update_camera_config(camera_status)
# Vérifier l'état de maintenance
if camera_status.get("maintenance", False):
logging.info("Caméra en mode maintenance, aucune action nécessaire")
return
# Vérifier si un arrêt de la procédure est demandé
if camera_status.get("stop_flag", False):
logging.info("Arrêt de la procédure en cours...")
def get_existing_Json(self):
self.filename = "/home/timelapse/Documents/Time_Lapse/CONFIG/config.json"
with open(self.filename, "r", encoding='utf-8') as file:
datas = json.load(file)
return datas
def create_this_Json(self, dic):
self.dic = { "set_config":dic["set_config"],
"maintenance": dic["maintenance"],
"stop current config":dic["stop current config"],
"timelapse":dic["timelapse"],
"conf nb_images":dic["conf nb_images"],
"nb_images restantes":dic["nb_images restantes"]
}
self.filename = "/home/timelapse/Documents/Time_Lapse/CONFIG/config.json"
with open(self.filename, "w") as file:
json.dump(dic, file)
# Supprimer le fichier de configuration s'il existe
config.delete_config_file()
# Confirmer l'arrêt
api_client.confirm_stop()
return
# Vérifier s'il y a une configuration à appliquer
if not camera_status.get("idle", True):
logging.info("Configuration active détectée")
# Obtenir les paramètres
interval = camera_status.get("interval", 3)
# Envoyer l'intervalle au microcontrôleur
logging.info(f"Envoi de l'intervalle au microcontrôleur: {interval}s")
micro_controller.send_interval(interval)
# Éteindre le système après configuration
logging.info("Configuration terminée, arrêt du système")
subprocess.run(["sudo", "shutdown", "now"])
except Exception as e:
logging.error(f"Erreur dans le script d'automatisation: {e}")
if __name__ == "__main__":
MC = MicroControler()
server = Server()
filename = "/home/timelapse/Documents/Time_Lapse/CONFIG/config.json"
server.get_request()
datas = server.get_dic_data()
#datas = {'set_config': False, 'maintenance': True, 'stop current config': True, 'timelapse': 7, 'conf nb_images': 12, 'nb_images restantes': 12}
print("Here are the datas loaded : ",datas)
if (datas["maintenance"]):
print("- Maintenance")
else:
print("- No Maintenance")
if (datas["stop current config"]):
if (os.path.exists(filename)):
print("- Stopping current config") #suppresion fichier json
os.remove(filename)
if (datas["set_config"]==False): #eddition fichier config
print("- Working on Raspberry config")
if (os.path.exists(filename)):
print("- Existing Config : -1 on Images")
datas = server.get_existing_Json()
datas["nb_images restantes"] = datas["nb_images restantes"] - 1
os.remove(filename)
if (datas["nb_images restantes"]==0):
pass
else:
server.create_this_Json(datas)
else:
datas = {'set_config': False, 'maintenance': False, 'stop current config': False, 'timelapse': 3, 'conf nb_images': 1, 'nb_images restantes': 1}
server.create_this_Json(datas)
else:
server.create_this_Json(datas)
print("- New Config")
print("- Shut Down")
print("TimeLapse sent is : ", datas["timelapse"])
MC.set_data_2_octets(datas["timelapse"])
time.sleep(1)
os.system("sudo shutdown now")
"""
#MC.set_data_2_octets(1)
#MC .set_data_2_octets(datas["timelapse"])
#le mot de passe c'est motdepasse
#nmcli dev wifi "le mot de passe c'est motdepasse" password "motdepasse"
"""
main()

View File

113
script.sh
View File

@@ -1,15 +1,104 @@
#!/bin/bash
nmcli dev wifi connect "Redmi Note 12 Pro" password "kingcard"
check_internet() {
ping -c 4 8.8.8.8 > /dev/null 2>&1
return $?
# Configuration
CONFIG_DIR="/home/timelapse/Documents/Time_Lapse/CONFIG"
LOG_FILE="/home/timelapse/Documents/Time_Lapse/timelapse.log"
BASE_DIR="/home/timelapse/Documents/Time_Lapse"
MAX_RETRIES=5
RETRY_DELAY=10
LOCK_FILE="/var/lock/timelapse.lock"
# Fonction de journalisation
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE"
}
if check_internet; then
echo "Connecté à internet"
python Time_Lapse_Connection.py
python Send_data_stocked.py
else
echo "pas connecté à internet"
python Time_Lapse_NoConnection.py
# Vérifier si une autre instance est en cours d'exécution
if [ -f "$LOCK_FILE" ]; then
PID=$(cat "$LOCK_FILE")
if ps -p $PID > /dev/null; then
log "Une autre instance est déjà en cours d'exécution (PID: $PID). Arrêt."
exit 1
else
log "Fichier de verrouillage obsolète trouvé. Suppression."
rm -f "$LOCK_FILE"
fi
fi
python Automate.py
# Créer le fichier de verrouillage
echo $$ > "$LOCK_FILE"
# Fonction de nettoyage à la sortie
cleanup() {
log "Nettoyage avant sortie"
rm -f "$LOCK_FILE"
exit $1
}
# Interception des signaux pour le nettoyage
trap 'cleanup 1' INT TERM
log "==============================================================="
log "Démarrage du système timelapse"
# Fonction pour se connecter au WiFi avec plusieurs tentatives
connect_wifi() {
local ssid="Redmi Note 12 Pro"
local password="kingcard"
local retries=0
while [ $retries -lt $MAX_RETRIES ]; do
log "Tentative de connexion WiFi ($((retries+1))/$MAX_RETRIES)"
nmcli dev wifi connect "$ssid" password "$password"
if check_internet; then
log "Connexion WiFi établie"
return 0
fi
retries=$((retries+1))
sleep $RETRY_DELAY
done
log "Échec de connexion WiFi après $MAX_RETRIES tentatives"
return 1
}
# Fonction pour vérifier la connexion internet
check_internet() {
ping -c 1 8.8.8.8 > /dev/null 2>&1
return $?
}
# S'assurer que le répertoire de configuration existe
mkdir -p "$CONFIG_DIR"
# Principal flux d'exécution
connect_wifi
# Vérifier si les chemins Python sont corrects
export PYTHONPATH="$BASE_DIR:$PYTHONPATH"
if check_internet; then
log "Connecté à internet"
# Vérifier si des données locales doivent être envoyées
log "Envoi des données stockées localement"
python3 "$BASE_DIR/sync_offline_data.py"
# Exécuter en mode connecté
log "Exécution du script en mode connecté"
python3 "$BASE_DIR/timelapse_online.py"
else
log "Pas connecté à internet"
log "Exécution du script en mode hors-ligne"
python3 "$BASE_DIR/timelapse_offline.py"
fi
# Exécuter le script d'automatisation dans tous les cas
log "Exécution du script d'automatisation"
python3 "$BASE_DIR/Automate.py"
log "Script terminé"
cleanup 0

40
sync_offline_data.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
# coding: utf-8
"""
Script pour la synchronisation des données stockées en mode hors ligne.
Remplace l'ancien Send_data_stocked.py
"""
import os
import sys
import logging
from timelapse.config import config
from timelapse.capture import timelapse_manager
def main():
"""
Fonction principale pour la synchronisation des données hors ligne.
"""
logging.info("-------------------------------------------------------------------")
logging.info("Démarrage de la synchronisation des données hors ligne")
try:
# Synchroniser les captures hors ligne
sync_count = timelapse_manager.sync_offline_captures()
if sync_count > 0:
logging.info(f"Synchronisation réussie de {sync_count} captures")
else:
logging.info("Aucune capture à synchroniser")
except Exception as e:
logging.error(f"Erreur lors de la synchronisation des données: {e}")
sys.exit(1)
logging.info("Synchronisation terminée")
if __name__ == "__main__":
main()

18
timelapse.service Normal file
View File

@@ -0,0 +1,18 @@
[Unit]
Description=Service Timelapse Raspberry Pi
DefaultDependencies=no
Before=basic.target
After=local-fs.target
[Service]
Type=simple
ExecStart=/bin/bash /home/timelapse/Documents/Time_Lapse/script.sh
WorkingDirectory=/home/timelapse/Documents/Time_Lapse
User=timelapse
Restart=on-failure
RestartSec=30
StandardOutput=append:/home/timelapse/Documents/Time_Lapse/timelapse-service.log
StandardError=append:/home/timelapse/Documents/Time_Lapse/timelapse-service.log
[Install]
WantedBy=sysinit.target

14
timelapse/__init__.py Normal file
View File

@@ -0,0 +1,14 @@
#!/usr/bin/env python3
# coding: utf-8
"""
Package timelapse pour la gestion du système de capture d'images.
"""
import logging
# Configure le logger au niveau du package
logging.getLogger(__name__).addHandler(logging.NullHandler())
# Version du package
__version__ = '1.0.0'

117
timelapse/api_client.py Normal file
View File

@@ -0,0 +1,117 @@
#!/usr/bin/env python3
# coding: utf-8
"""
Client API pour communiquer avec le serveur timelapse
"""
import os
import json
import logging
import requests
from requests.exceptions import RequestException
from .config import config
class APIClient:
"""Client API pour communiquer avec le serveur timelapse."""
def __init__(self, base_url=None):
"""Initialisation du client API."""
self.base_url = base_url or config.API_BASE_URL
self.headers = {'accept': 'application/json'}
def get_camera_status(self):
"""Récupère l'état de la caméra depuis l'API."""
url = f"{self.base_url}{config.API_ENDPOINTS['status']}"
try:
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
logging.info("Statut caméra récupéré avec succès")
return response.json()
else:
logging.error(f"Erreur lors de la récupération du statut: {response.status_code}")
return None
except RequestException as e:
logging.error(f"Erreur de connexion à l'API: {e}")
return None
def upload_measurement(self, image_path, timestamp, temperature, humidity):
"""Télécharge une mesure (image + données) vers l'API."""
url = f"{self.base_url}{config.API_ENDPOINTS['upload']}"
if not os.path.exists(image_path):
logging.error(f"Image non trouvée: {image_path}")
return None
data = {
'timestamp': timestamp,
'temperature': temperature,
'humidity': humidity
}
files = {
'image': (os.path.basename(image_path), open(image_path, 'rb'), 'image/jpeg')
}
try:
response = requests.post(url, headers=self.headers, data=data, files=files)
if response.status_code == 200 or response.status_code == 201:
logging.info(f"Image téléchargée avec succès: {os.path.basename(image_path)}")
return response.json()
else:
logging.error(f"Erreur lors du téléchargement: {response.status_code}")
return None
except RequestException as e:
logging.error(f"Erreur de connexion lors du téléchargement: {e}")
return None
finally:
# Fermer le fichier
files['image'][1].close()
def confirm_stop(self):
"""Confirme l'arrêt d'une procédure de capture."""
url = f"{self.base_url}{config.API_ENDPOINTS['stop']}"
try:
response = requests.post(url, headers=self.headers)
if response.status_code == 200:
logging.info("Arrêt de la procédure confirmé")
return True
else:
logging.error(f"Erreur lors de la confirmation d'arrêt: {response.status_code}")
return False
except RequestException as e:
logging.error(f"Erreur de connexion lors de la confirmation d'arrêt: {e}")
return False
def update_camera_config(self, status):
"""Met à jour la configuration de la caméra en fonction du statut reçu."""
if not status:
return False
try:
config_update = {
"maintenance": status.get("maintenance", False),
"timelapse": status.get("interval", 3),
"conf_nb_images": status.get("nb_images", 1),
"nb_images_restantes": status.get("nb_images", 1),
"set_config": status.get("idle", False),
"stop_current_config": status.get("stop_flag", False)
}
config.update_config(config_update)
logging.info("Configuration mise à jour depuis l'API")
return True
except Exception as e:
logging.error(f"Erreur lors de la mise à jour de la configuration: {e}")
return False
# Instance globale du client API
api_client = APIClient()

188
timelapse/capture.py Normal file
View File

@@ -0,0 +1,188 @@
#!/usr/bin/env python3
# coding: utf-8
"""
Module principal de gestion du timelapse
"""
import os
import time
import json
import logging
import shutil
from pathlib import Path
from datetime import datetime
from .config import config
from .api_client import api_client
from .sensors import env_sensor, camera
class TimelapseCaptureManager:
"""Gestion de la capture timelapse."""
def __init__(self):
"""Initialisation du gestionnaire de capture."""
self.offline_dir = os.path.join(config.PROJECT_DIR, "_offline")
Path(self.offline_dir).mkdir(parents=True, exist_ok=True)
def single_capture(self, online=True):
"""
Effectue une capture unique avec sauvegarde des mesures.
Args:
online: Si la caméra est connectée au serveur
Returns:
dict: Résultat de l'opération
"""
# Capture des données environnementales
env_data = env_sensor.read_data()
temperature = env_data["temperature"]
humidity = env_data["humidity"]
# Capture de l'image
image_path, timestamp = camera.capture_image()
if not image_path:
logging.error("Échec de la capture d'image")
return None
# Créer un fichier JSON avec les mesures
json_path = os.path.join(os.path.dirname(image_path), f"{timestamp}.json")
data = {
"timestamp": timestamp,
"temperature": temperature,
"humidity": humidity
}
with open(json_path, "w", encoding="utf-8") as json_file:
json.dump(data, json_file, indent=4)
logging.info(f"Données JSON sauvegardées: {json_path}")
# Si online, envoi des données au serveur
if online:
response = api_client.upload_measurement(image_path, timestamp, temperature, humidity)
if response:
logging.info(f"Image téléchargée avec succès, ID: {response.get('id', 'unknown')}")
# On peut éventuellement supprimer l'image locale si nécessaire
# self._cleanup_local_capture(image_path, json_path)
else:
logging.warning("Échec du téléchargement, sauvegarde locale conservée")
# Déplacer les fichiers vers le dossier offline pour synchro ultérieure
self._move_to_offline(image_path, json_path)
else:
# Stockage offline
self._move_to_offline(image_path, json_path)
return data
def _move_to_offline(self, image_path, json_path):
"""
Déplace les fichiers dans le dossier offline pour synchronisation ultérieure.
Args:
image_path: Chemin de l'image
json_path: Chemin du fichier JSON
"""
timestamp = os.path.basename(os.path.dirname(image_path))
offline_subdir = os.path.join(self.offline_dir, timestamp)
Path(offline_subdir).mkdir(parents=True, exist_ok=True)
# Copier les fichiers
shutil.copy2(image_path, os.path.join(offline_subdir, os.path.basename(image_path)))
shutil.copy2(json_path, os.path.join(offline_subdir, os.path.basename(json_path)))
logging.info(f"Capture sauvegardée en mode hors ligne: {offline_subdir}")
def _cleanup_local_capture(self, image_path, json_path):
"""
Nettoie les fichiers locaux après un téléchargement réussi.
Args:
image_path: Chemin de l'image
json_path: Chemin du fichier JSON
"""
capture_dir = os.path.dirname(image_path)
try:
os.remove(image_path)
os.remove(json_path)
os.rmdir(capture_dir) # Supprime le dossier s'il est vide
logging.debug(f"Nettoyage local effectué: {capture_dir}")
except Exception as e:
logging.error(f"Erreur lors du nettoyage local: {e}")
def sync_offline_captures(self):
"""
Synchronise les captures hors ligne avec le serveur.
Returns:
int: Nombre de captures synchronisées
"""
if not os.path.exists(self.offline_dir):
return 0
success_count = 0
offline_dirs = os.listdir(self.offline_dir)
for timestamp_dir in offline_dirs:
dir_path = os.path.join(self.offline_dir, timestamp_dir)
if not os.path.isdir(dir_path):
continue
image_path = os.path.join(dir_path, f"{timestamp_dir}.jpg")
json_path = os.path.join(dir_path, f"{timestamp_dir}.json")
if not (os.path.exists(image_path) and os.path.exists(json_path)):
continue
# Charger les données JSON
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Envoyer les données au serveur
timestamp = data.get("timestamp")
temperature = data.get("temperature")
humidity = data.get("humidity")
response = api_client.upload_measurement(image_path, timestamp, temperature, humidity)
if response:
logging.info(f"Capture hors ligne synchronisée: {timestamp_dir}")
# Supprimer le dossier local
shutil.rmtree(dir_path)
success_count += 1
else:
logging.warning(f"Échec de synchronisation pour: {timestamp_dir}")
except Exception as e:
logging.error(f"Erreur lors de la synchronisation de {timestamp_dir}: {e}")
return success_count
def run_capture_sequence(self, online=True):
"""
Exécute une séquence de capture selon la configuration.
Args:
online: Si la caméra est connectée au serveur
"""
# Vérifier si des images sont restantes à capturer
remaining_images = config.get("nb_images_restantes", 0)
if remaining_images > 0:
logging.info(f"Démarrage d'une séquence de capture ({remaining_images} images restantes)")
# Capturer une image
self.single_capture(online)
# Décrémenter le compteur d'images restantes
remaining_images = config.decrement_remaining_images()
logging.info(f"Images restantes: {remaining_images}")
else:
logging.info("Aucune image restante à capturer")
# Instance globale du gestionnaire de capture
timelapse_manager = TimelapseCaptureManager()

127
timelapse/config.py Normal file
View File

@@ -0,0 +1,127 @@
#!/usr/bin/env python3
# coding: utf-8
"""
Configuration centralisée pour le système timelapse
"""
import os
import json
import logging
from pathlib import Path
class Config:
"""Classe pour gérer la configuration du système timelapse."""
# Chemins par défaut
BASE_DIR = "/home/timelapse/Documents/Time_Lapse"
CONFIG_DIR = os.path.join(BASE_DIR, "CONFIG")
PROJECT_DIR = os.path.join(BASE_DIR, "PROJECT")
LOG_FILE = os.path.join(BASE_DIR, "timelapse.log")
CONFIG_FILE = os.path.join(CONFIG_DIR, "config.json")
# Configuration API
API_BASE_URL = "https://timelapse.kerboul.me/api"
API_ENDPOINTS = {
"status": "/camera/status",
"upload": "/camera/upload",
"stop": "/camera/stop"
}
# Configuration WiFi
WIFI_SSID = "Redmi Note 12 Pro"
WIFI_PASSWORD = "kingcard"
WIFI_MAX_RETRIES = 5
WIFI_RETRY_DELAY = 10
# Configuration par défaut
DEFAULT_CONFIG = {
"set_config": False,
"maintenance": False,
"stop_current_config": False,
"timelapse": 3,
"conf_nb_images": 1,
"nb_images_restantes": 1
}
def __init__(self):
"""Initialisation de la configuration."""
self.ensure_directories()
self.setup_logging()
self.config = self.load_config()
def ensure_directories(self):
"""S'assure que les répertoires nécessaires existent."""
Path(self.CONFIG_DIR).mkdir(parents=True, exist_ok=True)
Path(self.PROJECT_DIR).mkdir(parents=True, exist_ok=True)
def setup_logging(self):
"""Configure le système de logging."""
logging.basicConfig(
filename=self.LOG_FILE,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Ajouter également la sortie console
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.getLogger()
formatter.addHandler(console)
def load_config(self):
"""Charge la configuration depuis le fichier ou crée une configuration par défaut."""
if os.path.exists(self.CONFIG_FILE):
try:
with open(self.CONFIG_FILE, "r", encoding="utf-8") as file:
return json.load(file)
except Exception as e:
logging.error(f"Erreur lors du chargement de la configuration: {e}")
return self.DEFAULT_CONFIG
else:
self.save_config(self.DEFAULT_CONFIG)
return self.DEFAULT_CONFIG
def save_config(self, config_data=None):
"""Sauvegarde la configuration dans le fichier."""
if config_data is None:
config_data = self.config
try:
with open(self.CONFIG_FILE, "w", encoding="utf-8") as file:
json.dump(config_data, file, indent=4)
logging.info("Configuration sauvegardée")
except Exception as e:
logging.error(f"Erreur lors de la sauvegarde de la configuration: {e}")
def update_config(self, new_config):
"""Met à jour la configuration avec de nouvelles valeurs."""
self.config.update(new_config)
self.save_config()
def get(self, key, default=None):
"""Récupère une valeur de configuration par sa clé."""
return self.config.get(key, default)
def set(self, key, value):
"""Définit une valeur de configuration."""
self.config[key] = value
self.save_config()
def delete_config_file(self):
"""Supprime le fichier de configuration."""
if os.path.exists(self.CONFIG_FILE):
os.remove(self.CONFIG_FILE)
logging.info("Fichier de configuration supprimé")
def decrement_remaining_images(self):
"""Décrémente le nombre d'images restantes."""
if "nb_images_restantes" in self.config:
self.config["nb_images_restantes"] -= 1
self.save_config()
return self.config.get("nb_images_restantes", 0)
# Instance globale de configuration
config = Config()

173
timelapse/sensors.py Normal file
View File

@@ -0,0 +1,173 @@
#!/usr/bin/env python3
# coding: utf-8
"""
Module de gestion des capteurs (température, humidité) et de la caméra
"""
import os
import time
import logging
import smbus2
from datetime import datetime
import picamera2 as pc
from pathlib import Path
from .config import config
class EnvironmentSensor:
"""Classe pour gérer le capteur d'environnement (température, humidité)."""
def __init__(self, bus=1, address=0x44):
"""
Initialisation du capteur d'environnement.
Args:
bus: Numéro de bus I2C (par défaut 1)
address: Adresse I2C du capteur (par défaut 0x44)
"""
self.bus_num = bus
self.address = address
def read_data(self):
"""
Lit les données du capteur.
Returns:
dict: Dictionnaire contenant la température et l'humidité
"""
try:
bus = smbus2.SMBus(self.bus_num)
# Demande de mesure
bus.write_byte(self.address, 0xFD)
time.sleep(0.1)
# Lecture des données
ans = smbus2.i2c_msg.read(self.address, 6)
bus.i2c_rdwr(ans)
data = list(ans)
# Calcul des valeurs
t_ticks = data[0]*256 + data[1] # Pas de CRC utilisé
rh_ticks = data[3]*256 + data[4] # Pas de CRC utilisé
temperature = (175*t_ticks)/65535. - 45
humidity = (125*rh_ticks)/65535. - 6
return {
"temperature": round(temperature, 2),
"humidity": round(humidity, 2)
}
except Exception as e:
logging.error(f"Erreur lors de la lecture du capteur: {e}")
return {
"temperature": 0.0,
"humidity": 0.0
}
class Camera:
"""Classe pour gérer la caméra Raspberry Pi."""
def __init__(self):
"""Initialisation de la caméra."""
self.image_path = config.PROJECT_DIR
def capture_image(self, timestamp=None):
"""
Capture une image avec la caméra.
Args:
timestamp: Horodatage à utiliser pour le nom du fichier (optionnel)
Returns:
tuple: (chemin de l'image, timestamp)
"""
if timestamp is None:
timestamp = self.get_timestamp()
folder_path = os.path.join(self.image_path, timestamp)
Path(folder_path).mkdir(parents=True, exist_ok=True)
image_path = os.path.join(folder_path, f"{timestamp}.jpg")
try:
cam = pc.Picamera2()
conf = cam.create_preview_configuration(main={"size": (800, 600)})
cam.configure(conf)
cam.start_preview(pc.Preview.QTGL)
cam.start()
time.sleep(0.1)
cam.capture_file(image_path)
cam.close()
logging.info(f"Image capturée: {image_path}")
return image_path, timestamp
except Exception as e:
logging.error(f"Erreur lors de la capture d'image: {e}")
return None, timestamp
@staticmethod
def get_timestamp():
"""
Génère un horodatage pour nommer les fichiers.
Returns:
str: Horodatage au format YYYY-MM-DD HH:MM:SS
"""
now = datetime.now()
return now.strftime("%Y-%m-%d %H:%M:%S")
class MicroController:
"""Classe pour gérer la communication avec le microcontrôleur."""
def __init__(self, bus=1, address=0x28):
"""
Initialisation de la communication avec le microcontrôleur.
Args:
bus: Numéro de bus I2C (par défaut 1)
address: Adresse I2C du microcontrôleur (par défaut 0x28)
"""
self.bus_num = bus
self.address = address
def send_interval(self, interval):
"""
Envoie l'intervalle de capture au microcontrôleur.
Args:
interval: Intervalle de capture en secondes
"""
high_address = (interval >> 8) & 0xFF
low_address = interval & 0xFF
try:
self._send_byte(high_address)
time.sleep(0.015)
self._send_byte(low_address)
logging.info(f"Intervalle envoyé au microcontrôleur: {interval}s")
return True
except Exception as e:
logging.error(f"Erreur lors de l'envoi au microcontrôleur: {e}")
return False
def _send_byte(self, value):
"""
Envoie un octet au microcontrôleur.
Args:
value: Valeur à envoyer (0-255)
"""
bus = smbus2.SMBus(self.bus_num)
bus.write_byte(self.address, value)
# Instances globales
env_sensor = EnvironmentSensor()
camera = Camera()
micro_controller = MicroController()

51
timelapse_offline.py Normal file
View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python3
# coding: utf-8
"""
Script principal pour la capture d'images en mode hors ligne.
Remplace l'ancien Time_Lapse_NoConnection.py
"""
import os
import sys
import time
import logging
from timelapse.config import config
from timelapse.capture import timelapse_manager
from timelapse.sensors import micro_controller
def main():
"""
Fonction principale pour la capture d'images en mode hors ligne.
"""
logging.info("-------------------------------------------------------------------")
logging.info("Démarrage de la capture d'images en mode hors ligne")
try:
# Chargement de la configuration
remaining_images = config.get("nb_images_restantes", 0)
interval = config.get("timelapse", 3) # Intervalle par défaut: 3 secondes
logging.info(f"Configuration: {remaining_images} images à capturer, intervalle de {interval}s")
if remaining_images <= 0:
logging.info("Aucune image à capturer en mode hors ligne")
return
# Exécuter la séquence de capture en mode hors ligne
timelapse_manager.run_capture_sequence(online=False)
# Envoi de l'intervalle au microcontrôleur si nécessaire
if interval > 0:
micro_controller.send_interval(interval)
except Exception as e:
logging.error(f"Erreur lors de la capture en mode hors ligne: {e}")
sys.exit(1)
logging.info("Capture en mode hors ligne terminée")
if __name__ == "__main__":
main()

58
timelapse_online.py Normal file
View File

@@ -0,0 +1,58 @@
#!/usr/bin/env python3
# coding: utf-8
"""
Script principal pour la capture d'images en mode connecté.
Remplace l'ancien Time_Lapse_Connection.py
"""
import os
import sys
import time
import logging
from timelapse.config import config
from timelapse.api_client import api_client
from timelapse.capture import timelapse_manager
def main():
"""
Fonction principale pour la capture d'images en mode connecté.
"""
logging.info("-------------------------------------------------------------------")
logging.info("Démarrage de la capture d'images en mode connecté")
try:
# Vérifier le statut de la caméra via l'API
status = api_client.get_camera_status()
if status is None:
logging.error("Impossible d'obtenir le statut de la caméra depuis l'API")
sys.exit(1)
# Mise à jour de la configuration
api_client.update_camera_config(status)
# Vérification de l'état de maintenance
if status.get("maintenance", False):
logging.info("Caméra en mode maintenance, arrêt du script")
sys.exit(0)
# Vérifier si un arrêt est demandé
if status.get("stop_flag", False):
logging.info("Demande d'arrêt détectée, confirmation au serveur")
api_client.confirm_stop()
sys.exit(0)
# Exécuter la séquence de capture
timelapse_manager.run_capture_sequence(online=True)
except Exception as e:
logging.error(f"Erreur lors de la capture en mode connecté: {e}")
sys.exit(1)
logging.info("Capture en mode connecté terminée")
if __name__ == "__main__":
main()