API
- class sneakpeek.server.SneakpeekServer(consumer: Optional[sneakpeek.queue.consumer.Consumer] = None, scheduler: Optional[sneakpeek.scheduler.model.SchedulerABC] = None, web_server: Optional[fastapi_jsonrpc.API] = None, web_server_port: int = 8080)
Bases:
object
Sneakpeek server. It can run multiple services at once:
API - allows interactions with scrapers storage and scrapers via JsonRPC or UI
Worker - executes scheduled scrapers
Scheduler - automatically schedules scrapers that are stored in the storage
- Parameters
consumer (Consumer | None, optional) – Worker that consumes tasks queue. Defaults to None.
scheduler (SchedulerABC | None, optional) – Scrapers scheduler. Defaults to None.
web_server (jsonrpc.API | None, optional) – Web Server that implements API and exposes UI to interact with the system. Defaults to None.
web_server_port (int, optional) – Port which is used for Web Server (API, UI and metrics). Defaults to 8080.
- Return type
None
- static create(handlers: list[sneakpeek.scraper.model.ScraperHandler], scraper_storage: sneakpeek.scraper.model.ScraperStorageABC, queue_storage: sneakpeek.queue.model.QueueStorageABC, lease_storage: sneakpeek.scheduler.model.LeaseStorageABC, with_web_server: bool = True, with_worker: bool = True, with_scheduler: bool = True, worker_max_concurrency: int = 50, web_server_port: int = 8080, scheduler_storage_poll_delay: datetime.timedelta = datetime.timedelta(seconds=5), scheduler_lease_duration: datetime.timedelta = datetime.timedelta(seconds=60), middlewares: Optional[list[sneakpeek.scraper.model.Middleware]] = None, add_dynamic_scraper_handler: bool = False, session_logger_handler: Optional[logging.Handler] = None)
Create Sneakpeek server using default API, worker and scheduler implementations
- Parameters
handlers (list[ScraperHandler]) – List of handlers that implement scraper logic
scraper_storage (ScrapersStorage) – Scrapers storage
jobs_storage (ScraperJobsStorage) – Jobs storage
lease_storage (LeaseStorage) – Lease storage
with_web_server (bool, optional) – Whether to run API service. Defaults to True.
with_worker (bool, optional) – Whether to run worker service. Defaults to True.
with_scheduler (bool, optional) – Whether to run scheduler service. Defaults to True.
worker_max_concurrency (int, optional) – Maximum number of concurrently executed scrapers. Defaults to 50.
web_server_port (int, optional) – Port which is used for Web Server (API, UI and metrics). Defaults to 8080.
scheduler_storage_poll_delay (timedelta, optional) – How much scheduler wait before polling storage for scrapers updates. Defaults to 5 seconds.
scheduler_lease_duration (timedelta, optional) – How long scheduler lease lasts. Lease is required for scheduler to be able to create new scraper jobs. This is needed so at any point of time there’s only one active scheduler instance. Defaults to 1 minute.
plugins (list[Plugin] | None, optional) – List of plugins that will be used by scraper runner. Can be omitted if run_worker is False. Defaults to None.
add_dynamic_scraper_handler (bool, optional) – Whether to add dynamic scraper handler which can execute arbitrary user scripts. Defaults to False.
queue_storage (sneakpeek.queue.model.QueueStorageABC) –
middlewares (Optional[list[sneakpeek.scraper.model.Middleware]]) –
session_logger_handler (Optional[logging.Handler]) –
- serve(loop: Optional[asyncio.events.AbstractEventLoop] = None, blocking: bool = True) None
Start Sneakpeek server
- Parameters
loop (asyncio.AbstractEventLoop | None, optional) – AsyncIO loop to use. In case it’s None result of asyncio.get_event_loop() will be used. Defaults to None.
blocking (bool, optional) – Whether to block thread while server is running. Defaults to True.
- Return type
None
- stop(loop: Optional[asyncio.events.AbstractEventLoop] = None) None
Stop Sneakpeek server
- Parameters
loop (asyncio.AbstractEventLoop | None, optional) – AsyncIO loop to use. In case it’s None result of asyncio.get_event_loop() will be used. Defaults to None.
- Return type
None
- class sneakpeek.queue.model.EnqueueTaskRequest(*, task_name: str, task_handler: str, priority: sneakpeek.queue.model.TaskPriority, payload: str, timeout: datetime.timedelta | None = None)
Bases:
pydantic.main.BaseModel
Enqueue request
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- Parameters
task_name (str) –
task_handler (str) –
priority (sneakpeek.queue.model.TaskPriority) –
payload (str) –
timeout (datetime.timedelta | None) –
- Return type
None
- payload: str
Serialized task payload
- priority: sneakpeek.queue.model.TaskPriority
Task priority
- task_handler: str
Name of the task handler
- task_name: str
Task name (used to disallow concurrent execution of the task)
- timeout: datetime.timedelta | None
Task timeout
- class sneakpeek.queue.model.QueueABC
Bases:
abc.ABC
Task priority queue
- abstract async delete_old_tasks(keep_last: int = 50) None
Delete old historical tasks
- Parameters
keep_last (int, optional) – How many tasks to keep. Defaults to 50.
- Return type
None
- abstract async dequeue() sneakpeek.queue.model.Task | None
Try to dequeue a task from the queue.
- Returns
Dequeued task metadata
- Return type
- abstract async enqueue(request: sneakpeek.queue.model.EnqueueTaskRequest) sneakpeek.queue.model.Task
Enqueue task
- Parameters
request (EnqueueTaskRequest) – metadata of task to enqueue
- Returns
Enqueued task metadata
- Return type
- Raises
TaskHasActiveRunError – Error when there are other tasks with the same name in
PENDING
orSTARTED
state
- abstract async get_queue_len() int
Get number of pending items in the queue
- Return type
int
- abstract async get_task_instance(task_id: int) sneakpeek.queue.model.Task
Get task instance by ID
- Parameters
task_id (int) – Task ID
- Returns
Task instance
- Return type
- abstract async get_task_instances(task_name: str) list[sneakpeek.queue.model.Task]
Get task instances by task name
- Parameters
task_name (str) – Task name
- Returns
List of task instances
- Return type
list[Task]
- abstract async kill_dead_tasks() list[sneakpeek.queue.model.Task]
Kill dead tasks
- Returns
List of killed dead tasks
- Return type
list[Task]
- abstract async ping_task(id: int) sneakpeek.queue.model.Task
Send a heartbeat for the task
- Parameters
id (int) – Task ID
- Returns
Updated task metadata
- Return type
- Raises
TaskNotFoundError – Raised if task doesn’t exist
TaskNotStartedError – Raised if task is still in the
PENDING
stateTaskPingFinishedError – Raised if task is in finished state (e.g.
DEAD
)
- abstract async update_task(task: sneakpeek.queue.model.Task) sneakpeek.queue.model.Task
Update task metadata
- Parameters
task (Task) – updated task metadata to save
- Returns
Updated task metadata
- Return type
- Raises
TaskNotFoundError – Raised when task doesn’t exist
- class sneakpeek.queue.model.QueueStorageABC
Bases:
abc.ABC
Priority queue storage
- abstract async delete_old_tasks(keep_last: int = 50) None
Delete old historical tasks
- Parameters
keep_last (int, optional) – How many tasks to keep. Defaults to 50.
- Return type
None
- abstract async dequeue_task() sneakpeek.queue.model.Task | None
Try to dequeue pending task
- Returns
First pending task or None if the queue is empty
- Return type
Task | None
- abstract async enqueue_task(task: sneakpeek.queue.model.Task) sneakpeek.queue.model.Task
Add a new task instance and put it into the queue
- abstract async get_queue_len() int
Get number of pending items in the queue
- Return type
int
- abstract async get_task_instance(id: int) sneakpeek.queue.model.Task
Get task instance by ID
- Parameters
id (int) – Task ID
- Returns
Found task metadata
- Return type
- Raises
TaskNotFoundError – Raised when task doesn’t exist
- abstract async get_task_instances(task_name: str) list[sneakpeek.queue.model.Task]
Get task instances by task name
- Parameters
task_name (str) – Task name
- Returns
List of task instances
- Return type
list[Task]
- abstract async get_tasks() list[sneakpeek.queue.model.Task]
Get all task instances
- Returns
List of task instances
- Return type
list[Task]
- abstract async update_task(task: sneakpeek.queue.model.Task) sneakpeek.queue.model.Task
Update task metadata
- Parameters
task (Task) – updated task metadata to save
- Returns
Updated task metadata
- Return type
- Raises
TaskNotFoundError – Raised when task doesn’t exist
- class sneakpeek.queue.model.Task(*, id: int, task_name: str, task_handler: str, status: sneakpeek.queue.model.TaskStatus, priority: sneakpeek.queue.model.TaskPriority, created_at: datetime.datetime, started_at: datetime.datetime | None = None, last_active_at: datetime.datetime | None = None, finished_at: datetime.datetime | None = None, payload: str | None = None, result: str | None = None, timeout: datetime.timedelta | None = None)
Bases:
pydantic.main.BaseModel
Queue task state
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- Parameters
id (int) –
task_name (str) –
task_handler (str) –
status (sneakpeek.queue.model.TaskStatus) –
priority (sneakpeek.queue.model.TaskPriority) –
created_at (datetime.datetime) –
started_at (datetime.datetime | None) –
last_active_at (datetime.datetime | None) –
finished_at (datetime.datetime | None) –
payload (str | None) –
result (str | None) –
timeout (datetime.timedelta | None) –
- Return type
None
- created_at: datetime.datetime
When the task was created and enqueued
- finished_at: datetime.datetime | None
When the task finished
- id: int
Task unique identifier
- last_active_at: datetime.datetime | None
When the task last sent heartbeat
- payload: str | None
Serialized task payload
- priority: sneakpeek.queue.model.TaskPriority
Task priority
- result: str | None
Serialised task result
- started_at: datetime.datetime | None
When the task was dequeued and started being processed by the worker
- status: sneakpeek.queue.model.TaskStatus
Task status
- task_handler: str
Name of the task handler
- task_name: str
Task name (used to disallow concurrent execution of the task)
- timeout: datetime.timedelta | None
Task timeout
- exception sneakpeek.queue.model.TaskHasActiveRunError(data=None)
Bases:
fastapi_jsonrpc.BaseError
- exception sneakpeek.queue.model.TaskNotFoundError(data=None)
Bases:
fastapi_jsonrpc.BaseError
- exception sneakpeek.queue.model.TaskPingFinishedError(data=None)
Bases:
fastapi_jsonrpc.BaseError
- exception sneakpeek.queue.model.TaskPingNotStartedError(data=None)
Bases:
fastapi_jsonrpc.BaseError
- class sneakpeek.queue.model.TaskPriority(value)
Bases:
enum.Enum
Priority of the scraper job
- HIGH = 1
- NORMAL = 2
- UTMOST = 0
- class sneakpeek.queue.model.TaskStatus(value)
Bases:
str
,enum.Enum
Scraper job status
- DEAD = 'dead'
Task was inactive for a while, so scheduler marked it as dead and scheduler can schedule the task again
- FAILED = 'failed'
Task failed
- KILLED = 'killed'
Task was killed by the user
- PENDING = 'pending'
Task is in the queue
- STARTED = 'started'
Task dequeued by the worker and is being processed
- SUCCEEDED = 'succeeded'
Task succeeded
- exception sneakpeek.queue.model.TaskTimedOut(data=None)
Bases:
fastapi_jsonrpc.BaseError
- exception sneakpeek.queue.model.UnknownTaskHandlerError(data=None)
Bases:
fastapi_jsonrpc.BaseError
- class sneakpeek.scheduler.model.Lease(*, name: str, owner_id: str, acquired: datetime.datetime, acquired_until: datetime.datetime)
Bases:
pydantic.main.BaseModel
Global lease metadata
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- Parameters
name (str) –
owner_id (str) –
acquired (datetime.datetime) –
acquired_until (datetime.datetime) –
- Return type
None
- acquired: datetime.datetime
Time when the lease was acquired
- acquired_until: datetime.datetime
Time until the lease is acquired
- name: str
Lease name (resource name to be locked)
- owner_id: str
ID of the acquirer (should be the same if you already have the lease and want to prolong it)
- class sneakpeek.scheduler.model.LeaseStorageABC
Bases:
abc.ABC
Global lease storage abstract class
- abstract async maybe_acquire_lease(lease_name: str, owner_id: str, acquire_for: datetime.timedelta) sneakpeek.scheduler.model.Lease | None
Try to acquire lease (global lock).
- Parameters
lease_name (str) – Lease name (resource name to be locked)
owner_id (str) – ID of the acquirer (should be the same if you already have the lease and want to prolong it)
acquire_for (timedelta) – For how long lease will be acquired
- Returns
Lease metadata if it was acquired, None otherwise
- Return type
Lease | None
- abstract async release_lease(lease_name: str, owner_id: str) None
Release lease (global lock)
- Parameters
lease_name (str) – Lease name (resource name to be unlocked)
owner_id (str) – ID of the acquirer
- Return type
None
- class sneakpeek.scheduler.model.MultiPeriodicTasksStorage(storages: list[sneakpeek.scheduler.model.PeriodicTasksStorageABC])
Bases:
sneakpeek.scheduler.model.PeriodicTasksStorageABC
- Parameters
storages (list[sneakpeek.scheduler.model.PeriodicTasksStorageABC]) –
- Return type
None
- class sneakpeek.scheduler.model.PeriodicTasksStorageABC
Bases:
abc.ABC
- class sneakpeek.scheduler.model.SchedulerABC
Bases:
abc.ABC
- class sneakpeek.scheduler.model.StaticPeriodicTasksStorage(tasks: list[sneakpeek.scheduler.model.PeriodicTask])
Bases:
sneakpeek.scheduler.model.PeriodicTasksStorageABC
- Parameters
tasks (list[sneakpeek.scheduler.model.PeriodicTask]) –
- Return type
None
- class sneakpeek.scheduler.model.TaskSchedule(value)
Bases:
str
,enum.Enum
Periodic task schedule options. Note that it’s disallowed to have 2 concurrent task, so if there’s an active task new one won’t be scheduled
- CRONTAB = 'crontab'
Specify crontab when scraper should be scheduled
- EVERY_DAY = 'every_day'
Scraper will be scheduled every day
- EVERY_HOUR = 'every_hour'
Scraper will be scheduled every hour
- EVERY_MINUTE = 'every_minute'
Scraper will be scheduled every minute
- EVERY_MONTH = 'every_month'
Scraper will be scheduled every month
- EVERY_SECOND = 'every_second'
Scraper will be scheduled every second
- EVERY_WEEK = 'every_week'
Scraper will be scheduled every week
- INACTIVE = 'inactive'
Scraper won’t be automatically scheduled
- class sneakpeek.scraper.model.HttpMethod(value)
Bases:
str
,enum.Enum
HTTP method
- class sneakpeek.scraper.model.Middleware
Bases:
abc.ABC
Abstract class for the middleware which is called before each request and response
- abstract property name: str
Name of the middleware
- abstract async on_request(request: sneakpeek.scraper.model.Request, config: Optional[Any] = None) sneakpeek.scraper.model.Request
Function that is called on each (HTTP) request before its dispatched.
- abstract async on_response(request: sneakpeek.scraper.model.Request, response: aiohttp.client_reqrep.ClientResponse, config: Optional[Any] = None) aiohttp.client_reqrep.ClientResponse
Function that is called on each (HTTP) response before its result returned to the caller.
- Parameters
request (Request) – Request metadata
response (aiohttp.ClientResponse) – HTTP Response
config (Any | None, optional) – Middleware configuration. Defaults to None.
- Returns
HTTP Response
- Return type
aiohttp.ClientResponse
- class sneakpeek.scraper.model.Request(method: sneakpeek.scraper.model.HttpMethod, url: str, headers: Optional[dict[str, str]] = None, kwargs: Optional[dict[str, typing.Any]] = None)
Bases:
object
HTTP Request metadata
- Parameters
method (sneakpeek.scraper.model.HttpMethod) –
url (str) –
headers (dict[str, str] | None) –
kwargs (dict[str, typing.Any] | None) –
- Return type
None
- class sneakpeek.scraper.model.Scraper(*, id: str, name: str, handler: str, schedule: sneakpeek.scheduler.model.TaskSchedule, schedule_crontab: str | None = None, config: sneakpeek.scraper.model.ScraperConfig, priority: sneakpeek.queue.model.TaskPriority = TaskPriority.NORMAL, state: str | None = None, timeout: datetime.timedelta | None = None)
Bases:
pydantic.main.BaseModel
Scraper metadata
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- Parameters
id (str) –
name (str) –
handler (str) –
schedule (sneakpeek.scheduler.model.TaskSchedule) –
schedule_crontab (str | None) –
config (sneakpeek.scraper.model.ScraperConfig) –
priority (sneakpeek.queue.model.TaskPriority) –
state (str | None) –
timeout (datetime.timedelta | None) –
- Return type
None
- config: sneakpeek.scraper.model.ScraperConfig
Scraper configuration that is passed to the handler
- handler: str
Name of the scraper handler that implements scraping logic
- id: str
Scraper unique identifier
- name: str
Scraper name
- priority: sneakpeek.queue.model.TaskPriority
Default priority to enqueue scraper jobs with
- schedule: sneakpeek.scheduler.model.TaskSchedule
Scraper schedule configuration
- schedule_crontab: str | None
Must be defined if schedule equals to
CRONTAB
- state: str | None
Scraper state (might be useful to optimise scraping, e.g. only process pages that weren’t processed in the last jobs)
- timeout: datetime.timedelta | None
Timeout for the single scraper job
- class sneakpeek.scraper.model.ScraperContextABC
Bases:
abc.ABC
Scraper context - helper class that implements basic HTTP client which logic can be extended by middleware that can preprocess request (e.g. Rate Limiter) and postprocess response (e.g. Response logger).
- async delete(url: str | list[str], *, headers: Optional[dict[str, str]] = None, max_concurrency: int = 0, return_exceptions: bool = False, **kwargs) aiohttp.client_reqrep.ClientResponse | list[aiohttp.client_reqrep.ClientResponse | Exception]
Make DELETE request to the given URL(s)
- Parameters
url (str | list[str]) – URL(s) to send DELETE request to
headers (HttpHeaders | None, optional) – HTTP headers. Defaults to None.
max_concurrency (int, optional) – Maximum number of concurrent requests. If set to 0 no limit is applied. Defaults to 0.
return_exceptions (bool, optional) – Whether to return exceptions instead of raising if there are multiple URLs provided. Defaults to False,
**kwargs – See aiohttp.delete() for the full list of arguments
- Returns
HTTP response(s)
- Return type
Response
- abstract async download_file(method: sneakpeek.scraper.model.HttpMethod, url: str, *, file_path: Optional[str] = None, file_process_fn: Optional[Callable[[str], Awaitable[Any]]] = None, headers: Optional[dict[str, str]] = None, **kwargs) Union[str, Any]
Perform HTTP request and save it to the specified file
- Parameters
method (HttpMethod) – HTTP request method to perform
url (str) – URL to send HTTP request to
file_path (str, optional) – Path of the file to save request to. If not specified temporary file name will be generated. Defaults to None.
file_process_fn (Callable[[str], Any], optional) – Function to process the file. If specified then function will be applied to the file and its result will be returned, the file will be removed after the function call. Defaults to None.
headers (HttpHeaders | None, optional) – HTTP headers. Defaults to None.
**kwargs – See aiohttp.request() for the full list of arguments
- Returns
File path if file process function is not defined or file process function result otherwise
- Return type
str | Any
- abstract async download_files(method: sneakpeek.scraper.model.HttpMethod, urls: list[str], *, file_paths: Optional[list[str]] = None, file_process_fn: Optional[Callable[[str], Awaitable[Any]]] = None, headers: Optional[dict[str, str]] = None, max_concurrency: int = 0, return_exceptions: bool = False, **kwargs) list[typing.Union[str, typing.Any, Exception]]
Perform HTTP requests and save them to the specified files
- Parameters
method (HttpMethod) – HTTP request method to perform
urls (list[str]) – URLs to send HTTP request to
file_paths (list[str], optional) – Path of the files to save requests to. If not specified temporary file names will be generated. Defaults to None.
file_process_fn (Callable[[str], Any], optional) – Function to process the file. If specified then function will be applied to the file and its result will be returned, the file will be removed after the function call. Defaults to None.
headers (HttpHeaders | None, optional) – HTTP headers. Defaults to None.
max_concurrency (int, optional) – Maximum number of concurrent requests. If set to 0 no limit is applied. Defaults to 0.
return_exceptions (bool, optional) – Whether to return exceptions instead of raising if there are multiple URLs provided. Defaults to False,
**kwargs – See aiohttp.request() for the full list of arguments
- Returns
For each URL: file path if file process function is not defined or file process function result otherwise
- Return type
list[str | Any | Exception]
- async get(url: str | list[str], *, headers: Optional[dict[str, str]] = None, max_concurrency: int = 0, return_exceptions: bool = False, **kwargs) aiohttp.client_reqrep.ClientResponse | list[aiohttp.client_reqrep.ClientResponse | Exception]
Make GET request to the given URL(s)
- Parameters
url (str | list[str]) – URL(s) to send GET request to
headers (HttpHeaders | None, optional) – HTTP headers. Defaults to None.
max_concurrency (int, optional) – Maximum number of concurrent requests. If set to 0 no limit is applied. Defaults to 0.
return_exceptions (bool, optional) – Whether to return exceptions instead of raising if there are multiple URLs provided. Defaults to False,
**kwargs – See aiohttp.get() for the full list of arguments
- Returns
HTTP response(s)
- Return type
Response
- async head(url: str | list[str], *, headers: Optional[dict[str, str]] = None, max_concurrency: int = 0, return_exceptions: bool = False, **kwargs) aiohttp.client_reqrep.ClientResponse | list[aiohttp.client_reqrep.ClientResponse | Exception]
Make HEAD request to the given URL(s)
- Parameters
url (str | list[str]) – URL(s) to send HEAD request to
headers (HttpHeaders | None, optional) – HTTP headers. Defaults to None.
max_concurrency (int, optional) – Maximum number of concurrent requests. If set to 0 no limit is applied. Defaults to 0.
return_exceptions (bool, optional) – Whether to return exceptions instead of raising if there are multiple URLs provided. Defaults to False,
**kwargs – See aiohttp.head() for the full list of arguments
- Returns
HTTP response(s)
- Return type
Response
- async options(url: str | list[str], *, headers: Optional[dict[str, str]] = None, max_concurrency: int = 0, return_exceptions: bool = False, **kwargs) aiohttp.client_reqrep.ClientResponse | list[aiohttp.client_reqrep.ClientResponse | Exception]
Make OPTIONS request to the given URL(s)
- Parameters
url (str | list[str]) – URL(s) to send OPTIONS request to
headers (HttpHeaders | None, optional) – HTTP headers. Defaults to None.
max_concurrency (int, optional) – Maximum number of concurrent requests. If set to 0 no limit is applied. Defaults to 0.
return_exceptions (bool, optional) – Whether to return exceptions instead of raising if there are multiple URLs provided. Defaults to False,
**kwargs – See aiohttp.options() for the full list of arguments
- Returns
HTTP response(s)
- Return type
Response
- async post(url: str | list[str], *, headers: Optional[dict[str, str]] = None, max_concurrency: int = 0, return_exceptions: bool = False, **kwargs) aiohttp.client_reqrep.ClientResponse | list[aiohttp.client_reqrep.ClientResponse | Exception]
Make POST request to the given URL(s)
- Parameters
url (str | list[str]) – URL(s) to send POST request to
headers (HttpHeaders | None, optional) – HTTP headers. Defaults to None.
max_concurrency (int, optional) – Maximum number of concurrent requests. If set to 0 no limit is applied. Defaults to 0.
return_exceptions (bool, optional) – Whether to return exceptions instead of raising if there are multiple URLs provided. Defaults to False,
**kwargs – See aiohttp.post() for the full list of arguments
- Returns
HTTP response(s)
- Return type
Response
- async put(url: str | list[str], *, headers: Optional[dict[str, str]] = None, max_concurrency: int = 0, return_exceptions: bool = False, **kwargs) aiohttp.client_reqrep.ClientResponse | list[aiohttp.client_reqrep.ClientResponse | Exception]
Make PUT request to the given URL(s)
- Parameters
url (str | list[str]) – URL(s) to send PUT request to
headers (HttpHeaders | None, optional) – HTTP headers. Defaults to None.
max_concurrency (int, optional) – Maximum number of concurrent requests. If set to 0 no limit is applied. Defaults to 0.
return_exceptions (bool, optional) – Whether to return exceptions instead of raising if there are multiple URLs provided. Defaults to False,
**kwargs – See aiohttp.put() for the full list of arguments
- Returns
HTTP response(s)
- Return type
Response
- abstract async request(method: sneakpeek.scraper.model.HttpMethod, url: str | list[str], *, headers: Optional[dict[str, str]] = None, max_concurrency: int = 0, return_exceptions: bool = False, **kwargs) aiohttp.client_reqrep.ClientResponse | list[aiohttp.client_reqrep.ClientResponse | Exception]
Perform HTTP request to the given URL(s)
- Parameters
method (HttpMethod) – HTTP request method to perform
url (str | list[str]) – URL(s) to send HTTP request to
headers (HttpHeaders | None, optional) – HTTP headers. Defaults to None.
max_concurrency (int, optional) – Maximum number of concurrent requests. If set to 0 no limit is applied. Defaults to 0.
return_exceptions (bool, optional) – Whether to return exceptions instead of raising if there are multiple URLs provided. Defaults to False,
**kwargs – See aiohttp.request() for the full list of arguments
- Returns
HTTP response(s)
- Return type
Response
- abstract async update_scraper_state(state: str) sneakpeek.scraper.model.Scraper
Update scraper state
- Parameters
state (str) – State to persist
- Returns
Updated scraper metadata
- Return type
- class sneakpeek.scraper.model.ScraperHandler
Bases:
abc.ABC
Abstract class that scraper logic handler must implement
- abstract property name: str
Name of the handler
- abstract async run(context: sneakpeek.scraper.model.ScraperContextABC) str
Execute scraper logic
- Parameters
context (ScraperContext) – Scraper context
- Returns
scraper result that will be persisted in the storage (should be relatively small information to give sense on job result)
- Return type
str
- exception sneakpeek.scraper.model.ScraperNotFoundError(data=None)
Bases:
fastapi_jsonrpc.BaseError
- class sneakpeek.scraper.model.ScraperRunnerABC
Bases:
abc.ABC
- abstract async run(handler: sneakpeek.scraper.model.ScraperHandler, scraper: sneakpeek.scraper.model.Scraper) str
- Parameters
handler (ScraperHandler) – Scraper logic implementation
scraper (Scraper) – Scraper metadata
- Return type
str
- class sneakpeek.scraper.model.ScraperStorageABC
- exception sneakpeek.scraper.model.StorageIsReadOnlyError(data=None)
Bases:
fastapi_jsonrpc.BaseError
- exception sneakpeek.scraper.model.UnknownScraperHandlerError(data=None)
Bases:
fastapi_jsonrpc.BaseError
- class sneakpeek.queue.queue.Queue(storage: sneakpeek.queue.model.QueueStorageABC, dead_task_timeout: datetime.timedelta = datetime.timedelta(seconds=300))
Bases:
sneakpeek.queue.model.QueueABC
- Parameters
storage (sneakpeek.queue.model.QueueStorageABC) –
dead_task_timeout (datetime.timedelta) –
- Return type
None
- async delete_old_tasks(keep_last: int = 50) None
Delete old historical tasks
- Parameters
keep_last (int, optional) – How many tasks to keep. Defaults to 50.
- Return type
None
- async dequeue() sneakpeek.queue.model.Task | None
Try to dequeue a task from the queue.
- Returns
Dequeued task metadata
- Return type
- async enqueue(request: sneakpeek.queue.model.EnqueueTaskRequest) sneakpeek.queue.model.Task
Enqueue task
- Parameters
request (EnqueueTaskRequest) – metadata of task to enqueue
- Returns
Enqueued task metadata
- Return type
- Raises
TaskHasActiveRunError – Error when there are other tasks with the same name in
PENDING
orSTARTED
state
- async get_queue_len() int
Get number of pending items in the queue
- Return type
int
- async get_task_instance(task_id: int) sneakpeek.queue.model.Task
Get task instance by ID
- Parameters
task_id (int) – Task ID
- Returns
Task instance
- Return type
- async get_task_instances(task_name: str) list[sneakpeek.queue.model.Task]
Get task instances by task name
- Parameters
task_name (str) – Task name
- Returns
List of task instances
- Return type
list[Task]
- async kill_dead_tasks() list[sneakpeek.queue.model.Task]
Kill dead tasks
- Returns
List of killed dead tasks
- Return type
list[Task]
- async ping_task(id: int) sneakpeek.queue.model.Task
Send a heartbeat for the task
- Parameters
id (int) – Task ID
- Returns
Updated task metadata
- Return type
- Raises
TaskNotFoundError – Raised if task doesn’t exist
TaskNotStartedError – Raised if task is still in the
PENDING
stateTaskPingFinishedError – Raised if task is in finished state (e.g.
DEAD
)
- async update_task(task: sneakpeek.queue.model.Task) sneakpeek.queue.model.Task
Update task metadata
- Parameters
task (Task) – updated task metadata to save
- Returns
Updated task metadata
- Return type
- Raises
TaskNotFoundError – Raised when task doesn’t exist
- class sneakpeek.queue.in_memory_storage.InMemoryQueueStorage
Bases:
sneakpeek.queue.model.QueueStorageABC
In memory queue storage (should only be used for development purposes)
- Parameters
redis (Redis) – Async redis client
- Return type
None
- async delete_old_tasks(keep_last: int = 50) None
Delete old historical tasks
- Parameters
keep_last (int, optional) – How many tasks to keep. Defaults to 50.
- Return type
None
- async dequeue_task() sneakpeek.queue.model.Task | None
Try to dequeue pending task
- Returns
First pending task or None if the queue is empty
- Return type
Task | None
- async enqueue_task(task: sneakpeek.queue.model.Task) sneakpeek.queue.model.Task
Add a new task instance and put it into the queue
- async get_queue_len() int
Get number of pending items in the queue
- Return type
int
- async get_task_instance(id: int) sneakpeek.queue.model.Task
Get task instance by ID
- Parameters
id (int) – Task ID
- Returns
Found task metadata
- Return type
- Raises
TaskNotFoundError – Raised when task doesn’t exist
- async get_task_instances(task_name: str) list[sneakpeek.queue.model.Task]
Get task instances by task name
- Parameters
task_name (str) – Task name
- Returns
List of task instances
- Return type
list[Task]
- async get_tasks() list[sneakpeek.queue.model.Task]
Get all task instances
- Returns
List of task instances
- Return type
list[Task]
- async update_task(task: sneakpeek.queue.model.Task) sneakpeek.queue.model.Task
Update task metadata
- Parameters
task (Task) – updated task metadata to save
- Returns
Updated task metadata
- Return type
- Raises
TaskNotFoundError – Raised when task doesn’t exist
- class sneakpeek.queue.redis_storage.RedisQueueStorage(redis: redis.asyncio.client.Redis, task_ttl: datetime.timedelta = datetime.timedelta(days=7))
Bases:
sneakpeek.queue.model.QueueStorageABC
Redis queue storage. Queue has two components: priority queue implemented by sorted set (ZADD and ZPOPMIN) and key (task name) values (set of task instances) set
- Parameters
redis (Redis) – Async redis client
task_ttl (timedelta) – TTL of the task record in the redis. Defaults to 7 days.
- Return type
None
- async delete_old_tasks(keep_last: int = 50) None
Delete old historical tasks
- Parameters
keep_last (int, optional) – How many tasks to keep. Defaults to 50.
- Return type
None
- async dequeue_task() sneakpeek.queue.model.Task | None
Try to dequeue pending task
- Returns
First pending task or None if the queue is empty
- Return type
Task | None
- async enqueue_task(task: sneakpeek.queue.model.Task) sneakpeek.queue.model.Task
Add a new task instance and put it into the queue
- async get_queue_len() int
Get number of pending items in the queue
- Return type
int
- async get_task_instance(id: int) sneakpeek.queue.model.Task
Get task instance by ID
- Parameters
id (int) – Task ID
- Returns
Found task metadata
- Return type
- Raises
TaskNotFoundError – Raised when task doesn’t exist
- async get_task_instances(task_name: str) list[sneakpeek.queue.model.Task]
Get task instances by task name
- Parameters
task_name (str) – Task name
- Returns
List of task instances
- Return type
list[Task]
- async get_tasks() list[sneakpeek.queue.model.Task]
Get all task instances
- Returns
List of task instances
- Return type
list[Task]
- async update_task(task: sneakpeek.queue.model.Task) sneakpeek.queue.model.Task
Update task metadata
- Parameters
task (Task) – updated task metadata to save
- Returns
Updated task metadata
- Return type
- Raises
TaskNotFoundError – Raised when task doesn’t exist
- class sneakpeek.scheduler.scheduler.Scheduler(tasks_storage: sneakpeek.scheduler.model.PeriodicTasksStorageABC, lease_storage: sneakpeek.scheduler.model.LeaseStorageABC, queue: sneakpeek.queue.model.QueueABC, loop: Optional[asyncio.events.AbstractEventLoop] = None, tasks_poll_delay: datetime.timedelta = datetime.timedelta(seconds=5), lease_duration: datetime.timedelta = datetime.timedelta(seconds=60))
Bases:
sneakpeek.scheduler.model.SchedulerABC
- Parameters
tasks_storage (sneakpeek.scheduler.model.PeriodicTasksStorageABC) –
lease_storage (sneakpeek.scheduler.model.LeaseStorageABC) –
queue (sneakpeek.queue.model.QueueABC) –
loop (asyncio.events.AbstractEventLoop | None) –
tasks_poll_delay (datetime.timedelta) –
lease_duration (datetime.timedelta) –
- Return type
None
- async update_tasks() None
Poll storage for all existing periodic tasks and update corresponding scheduler jobs
- Return type
None
- class sneakpeek.scheduler.in_memory_lease_storage.InMemoryLeaseStorage
Bases:
sneakpeek.scheduler.model.LeaseStorageABC
In memory storage for leases. Should only be used for development purposes
- Return type
None
- async maybe_acquire_lease(lease_name: str, owner_id: str, acquire_for: datetime.timedelta) sneakpeek.scheduler.model.Lease | None
Try to acquire lease (global lock).
- Parameters
lease_name (str) – Lease name (resource name to be locked)
owner_id (str) – ID of the acquirer (should be the same if you already have the lease and want to prolong it)
acquire_for (timedelta) – For how long lease will be acquired
- Returns
Lease metadata if it was acquired, None otherwise
- Return type
Lease | None
- async release_lease(lease_name: str, owner_id: str) None
Release lease (global lock)
- Parameters
lease_name (str) – Lease name (resource name to be unlocked)
owner_id (str) – ID of the acquirer
- Return type
None
- class sneakpeek.scheduler.redis_lease_storage.RedisLeaseStorage(redis: redis.asyncio.client.Redis)
Bases:
sneakpeek.scheduler.model.LeaseStorageABC
Redis storage for leases. Should only be used for development purposes
- Parameters
redis (Redis) – Async redis client
- Return type
None
- async maybe_acquire_lease(lease_name: str, owner_id: str, acquire_for: datetime.timedelta) sneakpeek.scheduler.model.Lease | None
Try to acquire lease (global lock).
- Parameters
lease_name (str) – Lease name (resource name to be locked)
owner_id (str) – ID of the acquirer (should be the same if you already have the lease and want to prolong it)
acquire_for (timedelta) – For how long lease will be acquired
- Returns
Lease metadata if it was acquired, None otherwise
- Return type
Lease | None
- async release_lease(lease_name: str, owner_id: str) None
Release lease (global lock)
- Parameters
lease_name (str) – Lease name (resource name to be unlocked)
owner_id (str) – ID of the acquirer
- Return type
None
- class sneakpeek.scraper.context.ScraperContext(config: sneakpeek.scraper.model.ScraperConfig, middlewares: Optional[list[sneakpeek.scraper.model.Middleware]] = None, scraper_state: Optional[str] = None, update_scraper_state_func: Optional[Callable] = None)
Bases:
sneakpeek.scraper.model.ScraperContextABC
Scraper context - helper class that implements basic HTTP client which logic can be extended by plugins that can preprocess request (e.g. Rate Limiter) and postprocess response (e.g. Response logger).
- Parameters
config (ScraperConfig) – Scraper configuration
middleware (list[Middleware] | None, optional) – List of available middleware. Defaults to None.
scraper_state (str | None, optional) – Scraper state. Defaults to None.
update_scraper_state_func (Callable | None, optional) – Function that update scraper state. Defaults to None.
middlewares (list[sneakpeek.scraper.model.Middleware] | None) –
- Return type
None
- async download_file(method: sneakpeek.scraper.model.HttpMethod, url: str, *, file_path: Optional[str] = None, file_process_fn: Optional[Callable[[str], Awaitable[Any]]] = None, headers: Optional[dict[str, str]] = None, **kwargs) Union[str, Any]
Perform HTTP request and save it to the specified file
- Parameters
method (HttpMethod) – HTTP request method to perform
url (str) – URL to send HTTP request to
file_path (str, optional) – Path of the file to save request to. If not specified temporary file name will be generated. Defaults to None.
file_process_fn (Callable[[str], Any], optional) – Function to process the file. If specified then function will be applied to the file and its result will be returned, the file will be removed after the function call. Defaults to None.
headers (HttpHeaders | None, optional) – HTTP headers. Defaults to None.
**kwargs – See aiohttp.request() for the full list of arguments
- Returns
File path if file process function is not defined or file process function result otherwise
- Return type
str | Any
- async download_files(method: sneakpeek.scraper.model.HttpMethod, urls: list[str], *, file_paths: Optional[list[str]] = None, file_process_fn: Optional[Callable[[str], Awaitable[Any]]] = None, headers: Optional[dict[str, str]] = None, max_concurrency: int = 0, return_exceptions: bool = False, **kwargs) list[typing.Union[str, typing.Any, Exception]]
Perform HTTP requests and save them to the specified files
- Parameters
method (HttpMethod) – HTTP request method to perform
urls (list[str]) – URLs to send HTTP request to
file_paths (list[str], optional) – Path of the files to save requests to. If not specified temporary file names will be generated. Defaults to None.
file_process_fn (Callable[[str], Any], optional) – Function to process the file. If specified then function will be applied to the file and its result will be returned, the file will be removed after the function call. Defaults to None.
headers (HttpHeaders | None, optional) – HTTP headers. Defaults to None.
max_concurrency (int, optional) – Maximum number of concurrent requests. If set to 0 no limit is applied. Defaults to 0.
return_exceptions (bool, optional) – Whether to return exceptions instead of raising if there are multiple URLs provided. Defaults to False,
**kwargs – See aiohttp.request() for the full list of arguments
- Returns
For each URL: file path if file process function is not defined or file process function result otherwise
- Return type
list[str | Any | Exception]
- async request(method: sneakpeek.scraper.model.HttpMethod, url: str | list[str], *, headers: Optional[dict[str, str]] = None, max_concurrency: int = 0, return_exceptions: bool = False, **kwargs) aiohttp.client_reqrep.ClientResponse | list[aiohttp.client_reqrep.ClientResponse | Exception]
Perform HTTP request to the given URL(s)
- Parameters
method (HttpMethod) – HTTP request method to perform
url (str | list[str]) – URL(s) to send HTTP request to
headers (HttpHeaders | None, optional) – HTTP headers. Defaults to None.
max_concurrency (int, optional) – Maximum number of concurrent requests. If set to 0 no limit is applied. Defaults to 0.
return_exceptions (bool, optional) – Whether to return exceptions instead of raising if there are multiple URLs provided. Defaults to False,
**kwargs – See aiohttp.request() for the full list of arguments
- Returns
HTTP response(s)
- Return type
Response
- async update_scraper_state(state: str) sneakpeek.scraper.model.Scraper
Update scraper state
- Parameters
state (str) – State to persist
- Returns
Updated scraper metadata
- Return type
- class sneakpeek.scraper.runner.ScraperRunner(scraper_storage: sneakpeek.scraper.model.ScraperStorageABC, middlewares: Optional[list[sneakpeek.scraper.model.Middleware]] = None, loop: Optional[asyncio.events.AbstractEventLoop] = None)
Bases:
sneakpeek.scraper.model.ScraperRunnerABC
Default scraper runner implementation that is meant to be used in the Sneakpeek server
- Parameters
handlers (list[ScraperHandler]) – List of handlers that implement scraper logic
scrapers_storage (ScrapersStorage) – Sneakpeek scrapers storage implementation
jobs_storage (ScraperJobsStorage) – Sneakpeek jobs storage implementation
middlewares (list[Middleware] | None, optional) – List of middleware that will be used by scraper runner. Defaults to None.
scraper_storage (sneakpeek.scraper.model.ScraperStorageABC) –
loop (asyncio.events.AbstractEventLoop | None) –
- Return type
None
- async run(handler: sneakpeek.scraper.model.ScraperHandler, scraper: sneakpeek.scraper.model.Scraper) str
- Parameters
handler (ScraperHandler) – Scraper logic implementation
scraper (Scraper) – Scraper metadata
- Return type
str
- class sneakpeek.scraper.redis_storage.RedisScraperStorage(redis: redis.asyncio.client.Redis, is_read_only: bool = False)
Bases:
sneakpeek.scraper.model.ScraperStorageABC
- Parameters
redis (redis.asyncio.client.Redis) –
is_read_only (bool) –
- Return type
None
- class sneakpeek.scraper.in_memory_storage.InMemoryScraperStorage(initial_scrapers: Optional[list[sneakpeek.scraper.model.Scraper]] = None, is_read_only: bool = False)
Bases:
sneakpeek.scraper.model.ScraperStorageABC
- Parameters
initial_scrapers (list[sneakpeek.scraper.model.Scraper] | None) –
is_read_only (bool) –
- Return type
None
- class sneakpeek.scraper.dynamic_scraper_handler.DynamicScraperHandler
Bases:
sneakpeek.scraper.model.ScraperHandler
- property name: str
Name of the handler
- async run(context: sneakpeek.scraper.model.ScraperContextABC) str
Execute scraper logic
- Parameters
context (ScraperContext) – Scraper context
- Returns
scraper result that will be persisted in the storage (should be relatively small information to give sense on job result)
- Return type
str
- class sneakpeek.middleware.base.BaseMiddleware
Bases:
sneakpeek.scraper.model.Middleware
- property name: str
Name of the middleware
- async on_request(request: sneakpeek.scraper.model.Request, config: Optional[Any]) sneakpeek.scraper.model.Request
Function that is called on each (HTTP) request before its dispatched.
- async on_response(request: sneakpeek.scraper.model.Request, response: aiohttp.client_reqrep.ClientResponse, config: Optional[Any] = None) Coroutine[Any, Any, aiohttp.client_reqrep.ClientResponse]
Function that is called on each (HTTP) response before its result returned to the caller.
- Parameters
request (Request) – Request metadata
response (aiohttp.ClientResponse) – HTTP Response
config (Any | None, optional) – Middleware configuration. Defaults to None.
- Returns
HTTP Response
- Return type
aiohttp.ClientResponse
- class sneakpeek.middleware.parser.ParserMiddleware
Bases:
sneakpeek.middleware.base.BaseMiddleware
Parser middleware provides parsing utilities
- property name: str
Name of the middleware
- regex(text: str, pattern: str, flags: re.RegexFlag = RegexFlag.None) list[sneakpeek.middleware.parser.RegexMatch]
Find matches in the text using regular expression
- Parameters
text (str) – Text to search in
pattern (str) – Regular expression
flags (re.RegexFlag, optional) – Regular expression flags. Defaults to re.UNICODE | re.MULTILINE | re.IGNORECASE.
- Returns
Matches found in the text
- Return type
list[RegexMatch]
- class sneakpeek.middleware.parser.RegexMatch(full_match: str, groups: dict[str, str])
Bases:
object
Regex match
- Parameters
full_match (str) –
groups (dict[str, str]) –
- Return type
None
- full_match: str
Full regular expression match
- groups: dict[str, str]
Regular expression group matches
- class sneakpeek.middleware.proxy_middleware.ProxyMiddleware(default_config: Optional[sneakpeek.middleware.proxy_middleware.ProxyMiddlewareConfig] = None)
Bases:
sneakpeek.middleware.base.BaseMiddleware
Proxy middleware automatically sets proxy arguments for all HTTP requests.
- Parameters
default_config (sneakpeek.middleware.proxy_middleware.ProxyMiddlewareConfig | None) –
- Return type
None
- property name: str
Name of the middleware
- async on_request(request: sneakpeek.scraper.model.Request, config: Optional[Any]) sneakpeek.scraper.model.Request
Function that is called on each (HTTP) request before its dispatched.
- class sneakpeek.middleware.proxy_middleware.ProxyMiddlewareConfig(*, proxy: str | yarl.URL | None = None, proxy_auth: aiohttp.helpers.BasicAuth | None = None)
Bases:
pydantic.main.BaseModel
Proxy middleware config
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- Parameters
proxy (str | yarl.URL | None) –
proxy_auth (aiohttp.helpers.BasicAuth | None) –
- Return type
None
- proxy: str | yarl.URL | None
Proxy URL
- proxy_auth: aiohttp.helpers.BasicAuth | None
Proxy authentication info to use
- exception sneakpeek.middleware.rate_limiter_middleware.RateLimitedException
Bases:
Exception
Request is rate limited because too many requests were made to the host
- class sneakpeek.middleware.rate_limiter_middleware.RateLimitedStrategy(value)
Bases:
enum.Enum
What to do if the request is rate limited
- THROW = 1
Throw an exception
- WAIT = 2
Wait until request is no longer rate limited
- class sneakpeek.middleware.rate_limiter_middleware.RateLimiterMiddleware(default_config: Optional[sneakpeek.middleware.rate_limiter_middleware.RateLimiterMiddlewareConfig] = None)
Bases:
sneakpeek.middleware.base.BaseMiddleware
Rate limiter implements leaky bucket algorithm to limit number of requests made to the hosts. If the request is rate limited it can either raise an exception or wait until the request won’t be limited anymore.
- Parameters
default_config (sneakpeek.middleware.rate_limiter_middleware.RateLimiterMiddlewareConfig | None) –
- Return type
None
- property name: str
Name of the middleware
- async on_request(request: sneakpeek.scraper.model.Request, config: Optional[Any]) sneakpeek.scraper.model.Request
Function that is called on each (HTTP) request before its dispatched.
- class sneakpeek.middleware.rate_limiter_middleware.RateLimiterMiddlewareConfig(*, max_requests: int = 60, rate_limited_strategy: sneakpeek.middleware.rate_limiter_middleware.RateLimitedStrategy = RateLimitedStrategy.WAIT, time_window: datetime.timedelta = datetime.timedelta(seconds=60))
Bases:
pydantic.main.BaseModel
Rate limiter middleware configuration
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- Parameters
max_requests (int) –
rate_limited_strategy (sneakpeek.middleware.rate_limiter_middleware.RateLimitedStrategy) –
time_window (datetime.timedelta) –
- Return type
None
- max_requests: int
Maximum number of allowed requests per host within time window
- rate_limited_strategy: sneakpeek.middleware.rate_limiter_middleware.RateLimitedStrategy
What to do if the request is rate limited
- time_window: datetime.timedelta
Time window to aggregate requests
- class sneakpeek.middleware.requests_logging_middleware.RequestsLoggingMiddleware(default_config: Optional[sneakpeek.middleware.requests_logging_middleware.RequestsLoggingMiddlewareConfig] = None)
Bases:
sneakpeek.scraper.model.Middleware
Requests logging middleware logs all requests being made and received responses.
- Parameters
default_config (sneakpeek.middleware.requests_logging_middleware.RequestsLoggingMiddlewareConfig | None) –
- Return type
None
- property name: str
Name of the middleware
- async on_request(request: sneakpeek.scraper.model.Request, config: Optional[Any]) sneakpeek.scraper.model.Request
Function that is called on each (HTTP) request before its dispatched.
- async on_response(request: sneakpeek.scraper.model.Request, response: aiohttp.client_reqrep.ClientResponse, config: Optional[Any]) aiohttp.client_reqrep.ClientResponse
Function that is called on each (HTTP) response before its result returned to the caller.
- Parameters
request (Request) – Request metadata
response (aiohttp.ClientResponse) – HTTP Response
config (Any | None, optional) – Middleware configuration. Defaults to None.
- Returns
HTTP Response
- Return type
aiohttp.ClientResponse
- class sneakpeek.middleware.requests_logging_middleware.RequestsLoggingMiddlewareConfig(*, log_request: bool = True, log_response: bool = True)
Bases:
pydantic.main.BaseModel
Requests logging middleware config
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- Parameters
log_request (bool) –
log_response (bool) –
- Return type
None
- log_request: bool
Whether to log request being made
- log_response: bool
Whether to log response being made
- class sneakpeek.middleware.robots_txt_middleware.RobotsTxtMiddleware(default_config: Optional[sneakpeek.middleware.robots_txt_middleware.RobotsTxtMiddlewareConfig] = None)
Bases:
sneakpeek.middleware.base.BaseMiddleware
Robots.txt middleware can log and optionally block requests if they are disallowed by website robots.txt.
- Parameters
default_config (sneakpeek.middleware.robots_txt_middleware.RobotsTxtMiddlewareConfig | None) –
- Return type
None
- property name: str
Name of the middleware
- async on_request(request: sneakpeek.scraper.model.Request, config: Optional[Any]) sneakpeek.scraper.model.Request
Function that is called on each (HTTP) request before its dispatched.
- class sneakpeek.middleware.robots_txt_middleware.RobotsTxtMiddlewareConfig(*, violation_strategy: sneakpeek.middleware.robots_txt_middleware.RobotsTxtViolationStrategy = RobotsTxtViolationStrategy.LOG)
Bases:
pydantic.main.BaseModel
robots.txt middleware configuration
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- Parameters
violation_strategy (sneakpeek.middleware.robots_txt_middleware.RobotsTxtViolationStrategy) –
- Return type
None
- exception sneakpeek.middleware.robots_txt_middleware.RobotsTxtViolationException
Bases:
Exception
Exception which is raised if request is disallowed by website robots.txt
- class sneakpeek.middleware.robots_txt_middleware.RobotsTxtViolationStrategy(value)
Bases:
enum.Enum
What to do if the request is disallowed by website robots.txt
- LOG = 1
Only log violation
- THROW = 2
Raise an exception on vioalation
- class sneakpeek.middleware.user_agent_injecter_middleware.UserAgentInjecterMiddleware(default_config: Optional[sneakpeek.middleware.user_agent_injecter_middleware.UserAgentInjecterMiddlewareConfig] = None)
Bases:
sneakpeek.middleware.base.BaseMiddleware
This middleware automatically adds
User-Agent
header if it’s not present. It uses fake-useragent in order to generate fake real world user agents.- Parameters
default_config (sneakpeek.middleware.user_agent_injecter_middleware.UserAgentInjecterMiddlewareConfig | None) –
- Return type
None
- property name: str
Name of the middleware
- async on_request(request: sneakpeek.scraper.model.Request, config: Optional[Any]) sneakpeek.scraper.model.Request
Function that is called on each (HTTP) request before its dispatched.
- class sneakpeek.middleware.user_agent_injecter_middleware.UserAgentInjecterMiddlewareConfig(*, use_external_data: bool = True, browsers: list[str] = ['chrome', 'edge', 'firefox', 'safari', 'opera'])
Bases:
pydantic.main.BaseModel
Middleware configuration
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- Parameters
use_external_data (bool) –
browsers (list[str]) –
- Return type
None
- browsers: list[str]
List of browsers which are used to generate user agents
- use_external_data: bool
Whether to use external data as a fallback
- sneakpeek.api.create_api(scraper_storage: sneakpeek.scraper.model.ScraperStorageABC, queue: sneakpeek.queue.model.QueueABC, handlers: list[sneakpeek.scraper.model.ScraperHandler], session_logger_handler: Optional[logging.Handler] = None) fastapi_jsonrpc.API
Create JsonRPC API (FastAPI is used under the hood)
- Parameters
storage (Storage) – Sneakpeek storage implementation
queue (Queue) – Sneakpeek queue implementation
handlers (list[ScraperHandler]) – List of handlers that implement scraper logic
scraper_storage (sneakpeek.scraper.model.ScraperStorageABC) –
session_logger_handler (Optional[logging.Handler]) –
- Return type
fastapi_jsonrpc.API
- sneakpeek.api.get_api_entrypoint(scraper_storage: sneakpeek.scraper.model.ScraperStorageABC, queue: sneakpeek.queue.model.QueueABC, handlers: list[sneakpeek.scraper.model.ScraperHandler], session_logger_handler: Optional[sneakpeek.session_loggers.base.SessionLogger] = None) fastapi_jsonrpc.Entrypoint
Create public JsonRPC API entrypoint (mostly mimics storage and queue API)
- Parameters
storage (Storage) – Sneakpeek storage implementation
queue (Queue) – Sneakpeek queue implementation
handlers (list[ScraperHandler]) – List of handlers that implement scraper logic
scraper_storage (sneakpeek.scraper.model.ScraperStorageABC) –
session_logger_handler (Optional[sneakpeek.session_loggers.base.SessionLogger]) –
- Returns
FastAPI JsonRPC entrypoint
- Return type
jsonrpc.Entrypoint
- class sneakpeek.logging.TaskContextInjectingFilter(name='')
Bases:
logging.Filter
Scraper context filter which automatically injects scraper and scraper job IDs to the logging metadata.
Example of usage:
logger = logging.getLogger() handler = logging.StreamHandler() handler.addFilter(ScraperContextInjectingFilter()) logger.addHandler(handler)
Initialize a filter.
Initialize with the name of the logger which, together with its children, will have its events allowed through the filter. If no name is specified, allow every event.
- filter(record: logging.LogRecord) bool
Injects task metadata into log record:
task_id
- Task IDtask_name
- Task nametask_handler
- Task handler
- Parameters
record (logging.LogRecord) – Log record to inject metadata into
- Returns
Always True
- Return type
bool
- sneakpeek.logging.configure_logging(level: int = 20, session_logger_handler: Optional[logging.Handler] = None)
Helper function to configure logging:
Adds console logger to the root logger
Adds scraper context injector filter to the console logger
Configures console formatting to use scraper metadata
- Parameters
level (int, optional) – Minimum logging level. Defaults to logging.INFO.
session_logger_handler (Optional[logging.Handler]) –
- sneakpeek.logging.task_context(task: sneakpeek.queue.model.Task) None
Initialize scraper job logging context which automatically adds scraper and scraper job IDs to the logging metadata
- Parameters
scraper_job (ScraperJob) – Scraper job definition
task (sneakpeek.queue.model.Task) –
- Return type
None
- sneakpeek.metrics.count_invocations(subsystem: str)
Decorator for measuring number of function invocations (works for both sync and async functions).
@count_invocations(subsytem="my subsystem") def my_awesome_func(): ...
This will export following Prometheus counter metrics:
# Total number of invocations sneakpeek_invocations{subsystem="my subsystem", method="my_awesome_func", type="total", error=""} # Total number of successful invocations (ones that haven't thrown an exception) sneakpeek_invocations{subsystem="my subsystem", method="my_awesome_func", type="success", error=""} # Total number of failed invocations (ones that have thrown an exception) sneakpeek_invocations{subsystem="my subsystem", method="my_awesome_func", type="error", error="<Exception class name>"}
- Parameters
subsystem (str) – Subsystem name to be used in the metric annotation
- sneakpeek.metrics.measure_latency(subsystem: str)
Decorator for measuring latency of the function (works for both sync and async functions).
@measure_latency(subsytem="my subsystem") def my_awesome_func(): ...
This will export following Prometheus histogram metric:
sneakpeek_latency{subsystem="my subsystem", method="my_awesome_func"}
- Parameters
subsystem (str) – Subsystem name to be used in the metric annotation