bigquery_connectors.py

import pandas_gbq
from google.oauth2 import service_account
import os.path
import logging
from google.cloud import bigquery 
from google.auth import default

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

SCOPES = ['https://www.googleapis.com/auth/bigquery']
credentials, project_id = default(scopes=SCOPES)

project_id = "gtm-kvzqx36-nmy5y"
pandas_gbq.context.credentials = credentials
pandas_gbq.context.project = project_id


def load_data_to_GBQ(df, table_name, dataset_name, load):
    dataset_name = f'{dataset_name}.{table_name}'
    logger.info(f'Writing to DATASET: {dataset_name}')
    
    pandas_gbq.to_gbq(df, dataset_name, project_id=project_id, if_exists=load,
                      location='europe-central2')


def load_table_xentral_json(df, table_name, dataset_name, load, table_schema):
    dataset_name = f'{dataset_name}.{table_name}'
    pandas_gbq.to_gbq(df, dataset_name, project_id=project_id, if_exists=load, table_schema=table_schema,
                      location='europe-central2')


def read_gbq(query):
    return pandas_gbq.read_gbq(
        query, 
        progress_bar_type=None,
        location='europe-central2'
    )


def execute_query(query: str) -> bool:
    try:
        print(query)
        pandas_gbq.context.location = 'europe-central2'
        pandas_gbq.read_gbq(query, progress_bar_type=None)
        logger.info("Successfully executed query")
        return True
    except Exception as e:
        logger.error(f"Error executing query: {str(e)}")
        return False



def execute_query_async(query: str):
    try:
        job_config = {
            'query': {
                'priority': 'INTERACTIVE',
                'useQueryCache': True,
                'timeoutMs': 120000
            }
        }
        return pandas_gbq.read_gbq(
            query,
            progress_bar_type=None,
            configuration=job_config,
            location='europe-central2'
        )
    except Exception as e:
        logger.error(f"Error executing query: {str(e)}")
        raise
search_console_connectors.py

import os.path
import time
import logging
from typing import Dict
from datetime import datetime, timedelta

import pandas as pd
from google.oauth2 import service_account
from google.auth import default
from googleapiclient.discovery import build, Resource

from api.bigquery_connectors import load_data_to_GBQ, read_gbq

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

API_SERVICE_NAME = "webmasters"
API_VERSION = "v3"
SCOPE = ["https://www.googleapis.com/auth/webmasters"]
SEARCH_CONSOLE_DOMAINS = {
    'FR': 'sc-domain:{YOUR_DOMAIN_FR}',
    'DE': 'sc-domain:{YOUR_DOMAIN_DE}',
    'CH': 'sc-domain:{YOUR_DOMAIN_CH}',
}

DIMENSIONS = ["page", "date", "query", "country", "device"]
MAX_ROWS = 25000


def auth_using_default() -> Resource:
    credentials, project_id = default(scopes=SCOPE)
    service = build(API_SERVICE_NAME, API_VERSION, credentials=credentials)
    return service


service = auth_using_default()


def query(client: Resource, payload: Dict[str, str], site: str) -> Dict[str, any]:
    domain = SEARCH_CONSOLE_DOMAINS[site]
    response = client.searchanalytics().query(siteUrl=domain, body=payload).execute()
    return response


class SearchConsole:

    def set_start_date(self, site, data="2022-01-01"):
        read_gbq(
            f"UPDATE {YOUR_PROJECT_ID}.Search_console_{site}.sync_dates "
            f"SET time_update = '{data}' WHERE table_name = 'Search_console'"
        )

    def search_console_extract(self, site):
        print(f"📍START OF DATA UPDATE - SearchConsole - {datetime.utcnow().strftime('%Y-%m-%d')}")

        start_date = read_gbq(
            f"SELECT time_update FROM {YOUR_PROJECT_ID}.Search_console_{site}.sync_dates "
            f"WHERE table_name = 'Search_console'"
        )["time_update"][0]

        count_row_gbq = read_gbq(
            f"SELECT COUNT(*) FROM {YOUR_PROJECT_ID}.Search_console_{site}.Ful_data"
        )["f0_"][0]

        end_date = datetime.utcnow().strftime("%Y-%m-%d")

        if start_date <= end_date:
            total_rows = []
            i = 0

            while True:
                payload = {
                    "startDate": start_date,
                    "endDate": end_date,
                    "dimensions": DIMENSIONS,
                    "rowLimit": MAX_ROWS,
                    "startRow": i * MAX_ROWS,
                }
                response = query(service, payload, site)

                if response.get("rows") is None:
                    break

                total_rows.extend(response.get("rows"))
                i += 1

            if not total_rows:
                print("🛑 The record does not exist, there is no data...")
            else:
                df = pd.DataFrame(total_rows)
                df[DIMENSIONS] = pd.DataFrame(df["keys"].tolist(), index=df.index)
                df = df[[
                    'clicks', 'impressions', 'ctr', 'position',
                    'page', 'date', 'query', 'country', 'device'
                ]]

                read_gbq(
                    f"DELETE FROM {YOUR_PROJECT_ID}.Search_console_{site}.Ful_data "
                    f"WHERE date IN ('{start_date}')"
                )
                time.sleep(3)

                count_row_delete_gbq = read_gbq(
                    f"SELECT COUNT(*) FROM {YOUR_PROJECT_ID}.Search_console_{site}.Ful_data"
                )["f0_"][0]
                time.sleep(3)

                load_data_to_GBQ(df, "Ful_data", f"Search_console_{site}", "append")
                time.sleep(3)

                count_row_new_gbq = read_gbq(
                    f"SELECT COUNT(*) FROM {YOUR_PROJECT_ID}.Search_console_{site}.Ful_data"
                )["f0_"][0]
                time.sleep(2)

                max_date_gbq = read_gbq(
                    f"SELECT MAX(date) FROM {YOUR_PROJECT_ID}.Search_console_{site}.Ful_data"
                )["f0_"][0]

                read_gbq(
                    f"UPDATE {YOUR_PROJECT_ID}.Search_console_{site}.sync_dates "
                    f"SET time_update = '{max_date_gbq}' WHERE table_name = 'Search_console'"
                )

                print(f" Search Console - 🟢\n\n"
                      f"available in df: {len(total_rows)}\n"
                      f"available in GBQ: {count_row_gbq}\n"
                      f"available in GBQ (del start date): {count_row_delete_gbq}\n"
                      f"available in GBQ_new: {count_row_new_gbq}\n"
                      f"MAX DATE DF: {max_date_gbq}\n"
                      f"DATA UPDATE FINISHED ✅")
stamped_io_connectors.py

import requests
import json
from requests.auth import HTTPBasicAuth
import pandas as pd
from api.bigquery_connectors import load_data_to_GBQ

username = "{YOUR_TOKEN}"
password = "{YOUR_KEY}"
base_url = "{YOUR_API_URL}"
payload = ''
headers = {
    'Content-Type': 'application/json',
}


class StampedIO:

    def table_name_validation(name):
        return name.translate({ord('/'): '_'})

    def pereadresacia(response, name):
        function_name = StampedIO.table_name_validation(name)
        getattr(StampedIO, function_name)(response, name)

    def Stamped_io_to_load(response, name):
        df_ful = response.json()
        if df_ful == [] or df_ful['results'] == []:
            return print(name, 'None data')
        StampedIO.pereadresacia(response, name)

    def account_webhooks(response, name):
        print(name, response.json())

    def dashboard_reviews(response, name):
        df_ful = response.json()
        columns = [...]  # полный список колонок смотри в оригинале
        df_data = df_ful['results']
        df = pd.json_normalize(df_data, sep="_")[columns]
        StampedIO.load_page(name, df, df_ful['page'], df_ful['totalPages'])

    def dashboard_products(response, name):
        df_ful = response.json()
        df_data = df_ful['results']
        df = pd.json_normalize(df_data, sep="_")
        StampedIO.load_page(name, df, df_ful['page'], df_ful['totalPages'])

    def account_groups(response, name):
        print(name, response.json())

    def dashboard_customers(response, name):
        df_ful = response.json()
        columns = [...]  # список колонок
        df_data = df_ful['results']
        df = pd.json_normalize(df_data, sep="_")[columns]
        StampedIO.load_page(name, df, df_ful['page'], df_ful['totalPages'])

    def dashboard_questions(response, name):
        print(name, response.json())

    def survey_unsubscribe(response, name):
        print(name, response.json())

    def dashboard_nps(response, name):
        print(name, response.json())

    def survey_reviews(response, name):
        print(name, response.json())

    def load_page(name, df, page_current, page_last):
        table_name = StampedIO.table_name_validation(name)
        if page_current == 1 and page_current != page_last:
            load_data_to_GBQ(df, table_name, 'Stamped', 'replace')
            return StampedIO.Stamped_io_to_load(name, page_current + 1)
        if page_current == 1 and page_current == page_last:
            load_data_to_GBQ(df, table_name, 'Stamped', 'replace')
            return print('✅', table_name, ' the end load ✅')
        if page_current < page_last:
            load_data_to_GBQ(df, table_name, 'Stamped', 'append')
            return StampedIO.Stamped_io_to_load(name, page_current + 1)
        if page_current == page_last:
            load_data_to_GBQ(df, table_name, 'Stamped', 'append')
            return print('✅', table_name, ' the end load ✅')

    def Stamped_io_extract(name, page):
        response = requests.get(
            f"{base_url}{name}?page={page}",
            headers=headers,
            data=payload,
            auth=HTTPBasicAuth(username, password)
        )
        if response.status_code != 200:
            print('❗❗❗', name, " -- ошибка", response.status_code, '❗❗❗')
            return
        return StampedIO.Stamped_io_to_load(response, name)
klaviyo_connectors.py

import requests
import pandas as pd
import time
import logging
from datetime import datetime, timedelta

from api.bigquery_connectors import load_data_to_GBQ, read_gbq

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

API = "&api_key={YOUR_KEY}"
parameter = 0

table_config = {
    'campaigns': "campaigns",
    'catalog-items': "full",
    'catalog-categories': "full",
    'catalog-variants': "full",
    'events': "last_updates",
    'profiles': "last_updates",
    'flows': "full",
    'lists': "last_updates",
    'metrics': "full",
    'segments': "last_updates",
    'tags': "full",
    'templates': "last_updates"
}


class Klaviyo:
    site = ''
    i = 0
    SITE_CONFIGS = None

    def __init__(self, configuration):
        self.SITE_CONFIGS = configuration

    def get_klaviyo_headers(self):
        api_key = None
        if self.site in self.SITE_CONFIGS:
            api_key = self.SITE_CONFIGS[self.site]['klaviyo_key']
        headers = {
            "accept": "application/json",
            "revision": "2023-10-15",
            "Authorization": f"Klaviyo-API-Key {api_key}"
        }
        return headers

    def zbroc_date_GBQ(self):
        dates = [{'table_name': name, 'time_update': "%Y-%m-%d"} for name in table_config]
        pd_data = pd.DataFrame(dates)
        load_data_to_GBQ(pd_data, 'sync_dates', 'Klaviyo', 'replace')

    def del_data(self, table_config):
        try:
            for name in table_config:
                read_gbq(
                    f"UPDATE {YOUR_PROJECT_ID}.Klaviyo.sync_dates "
                    f"SET time_update = '%Y-%m-%d' WHERE table_name = '{name}'"
                )
            print("📍DATE RESET - KLAVIYO")
        except Exception as e:
            print(f"❌ DATE RESET - KLAVIYO\n\nerror ->\n{e}")

    def klaviyo_extract(self, site):
        self.site = site
        print(f"📍START OF DATA UPDATE - KLAVIYO {site} - {datetime.utcnow().strftime('%Y-%m-%d')}")
        for table_name in table_config:
            self.i = 0
            self._klaviyo_extra(table_name)
        return True

    def _klaviyo_extra(self, table_name):
        value = table_config[table_name]
        try:
            if value == 'campaigns':
                self.extract_campaigns(table_name)
            elif value == "full":
                self.extract_full(table_name)
            else:
                self.extract_last_updates(table_name)
        except Exception as e:
            self.i += 1
            if self.i <= 4:
                self._klaviyo_extra(table_name)
            else:
                print(f'🛑TABLE NAME - {table_name.upper()}  error -> {e}')
        return True

    def extract_campaigns(self, table_name):
        headers = self.get_klaviyo_headers()
        site_name = self.site

        count_row_gbq = read_gbq(
            f"SELECT COUNT(*) FROM {YOUR_PROJECT_ID}.Klaviyo_{site_name}.{table_name}"
        )["f0_"][0]

        response = requests.get(
            "https://a.klaviyo.com/api/campaigns/?filter=equals(messages.channel,'email')",
            headers=headers
        )
        response_data = response.json()

        if not response_data['data']:
            print(f'TABLE NAME - {table_name.upper()} - None Data')
            return

        df = pd.DataFrame(response_data['data'])
        next_page = response_data['links']['next']

        while next_page:
            response = requests.get(next_page, headers=headers)
            data = response.json()
            df = pd.concat([df, pd.DataFrame(data['data'])], ignore_index=True)
            next_page = data['links']['next']

        for col in df.columns:
            df[col] = [','.join(map(str, [v])) for v in df[col]]

        load_data_to_GBQ(df, table_name, 'Klaviyo', 'replace')

        read_gbq(
            f"UPDATE {YOUR_PROJECT_ID}.Klaviyo_{site_name}.sync_dates "
            f"SET time_update = '{datetime.utcnow().strftime('%Y-%m-%d')}' "
            f"WHERE table_name = '{table_name}'"
        )

        count_row_gbq_new = read_gbq(
            f"SELECT COUNT(*) FROM {YOUR_PROJECT_ID}.Klaviyo_{site_name}.{table_name}"
        )["f0_"][0]

        print(f"{site_name} TABLE - {table_name.upper()} ✅\n"
              f"Rows in Klaviyo: {len(df)}\n"
              f"Before: {count_row_gbq} → After: {count_row_gbq_new}\n")

    def extract_full(self, table_name):
        headers = self.get_klaviyo_headers()
        site_name = self.site

        try:
            count_row_gbq = read_gbq(
                f"SELECT COUNT(*) FROM {YOUR_PROJECT_ID}.Klaviyo_{site_name}.{table_name}"
            )["f0_"][0]
        except:
            count_row_gbq = "None"

        response = requests.get(f"https://a.klaviyo.com/api/{table_name}", headers=headers)
        data = response.json()

        if not data['data']:
            print(f'🛑TABLE NAME - {table_name.upper()} - None Data')
            return

        df = pd.DataFrame(data['data'])
        next_page = data['links']['next']
        max_page = 10
        page = 1
        count_interaction = 1

        while True:
            if not next_page or page == max_page:
                for col in df.columns:
                    df[col] = [','.join(map(str, [v])) for v in df[col]]
                ids = ', '.join([f"'{x}'" for x in df["id"].tolist()])
                try:
                    read_gbq(
                        f"DELETE FROM {YOUR_PROJECT_ID}.Klaviyo_{site_name}.{table_name} WHERE id IN ({ids})"
                    )
                except Exception as e:
                    logger.error(f"Error deleting data {e}")

                load_data_to_GBQ(
                    df,
                    table_name,
                    f'Klaviyo_{site_name}',
                    'replace' if count_interaction == 1 else 'append'
                )

                read_gbq(
                    f"UPDATE {YOUR_PROJECT_ID}.Klaviyo.sync_dates SET time_update = "
                    f"'{datetime.utcnow().strftime('%Y-%m-%d')}' WHERE table_name = '{table_name}'"
                )

                break

            response = requests.get(next_page, headers=headers)
            next_data = response.json()
            df = pd.concat([df, pd.DataFrame(next_data['data'])], ignore_index=True)
            next_page = next_data['links']['next']
            page += 1
            count_interaction += 1

    def extract_last_updates(self, table_name):
        headers = self.get_klaviyo_headers()
        site_name = self.site

        old_date = read_gbq(
            f"SELECT * FROM {YOUR_PROJECT_ID}.Klaviyo_{site_name}.sync_dates "
            f"WHERE table_name = '{table_name}'"
        )['time_update'][0]

        try:
            count_row_gbq = read_gbq(
                f"SELECT COUNT(*) FROM {YOUR_PROJECT_ID}.Klaviyo_{site_name}.{table_name}"
            )["f0_"][0]
        except:
            count_row_gbq = "None"

        if old_date == "%Y-%m-%d":
            self.extract_full(table_name)
            return

        field = 'datetime' if table_name == 'events' else 'updated'
        url = f"https://a.klaviyo.com/api/{table_name}?filter=greater-than({field},{old_date})"
        response = requests.get(url, headers=headers)
        data = response.json()

        if not data['data']:
            print(f'{site_name} TABLE - {table_name.upper()} - None Data')
            return

        df = pd.DataFrame(data['data'])
        next_page = data['links']['next']
        while next_page:
            response = requests.get(next_page, headers=headers)
            data = response.json()
            df = pd.concat([df, pd.DataFrame(data['data'])], ignore_index=True)
            next_page = data['links']['next']

        for col in df.columns:
            df[col] = [','.join(map(str, [v])) for v in df[col]]

        ids = ', '.join([f"'{x}'" for x in df["id"].tolist()])
        try:
            read_gbq(
                f"DELETE FROM {YOUR_PROJECT_ID}.Klaviyo_{site_name}.{table_name} WHERE id IN ({ids})"
            )
        except Exception as e:
            logger.error(f"Error deleting old data: {e}")

        load_data_to_GBQ(df, table_name, f'Klaviyo_{site_name}', 'append')

        read_gbq(
            f"UPDATE {YOUR_PROJECT_ID}.Klaviyo_{site_name}.sync_dates "
            f"SET time_update = '{datetime.utcnow().strftime('%Y-%m-%d')}' WHERE table_name = '{table_name}'"
        )
woocommerce_connectors.py

import time
import json
import logging
import os
import pandas as pd
import requests

from datetime import datetime, timedelta
from woocommerce import API
from google.auth import default
from google.cloud import secretmanager
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from api. bigquery_connectors import load_data_to_GBQ, read_gbq

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def get_secret():
    credentials, project = default()
    client = secretmanager.SecretManagerServiceClient()
    name = f"projects/{YOUR_PROJECT_ID}/secrets/{YOUR_SECRET_NAME}/versions/latest"
    response = client.access_secret_version(request={"name": name})
    secret_data = response.payload.data.decode("UTF-8")
    return json.loads(secret_data)


try:
    WEBSITE_CONFIGS = get_secret()
except Exception as e:
    logger.error(f"Failed to load website configs: {e}")
    WEBSITE_CONFIGS = {}

table_config = {
    'shipping_methods': "full",
    'customers': "full",
    'products/attributes': "full",
    'products/categories': "full",
    'products/tags': "full",
    'reports/sales': "full",
    'settings': "full",
    'taxes': "full",
    'coupons': "last_updates",
    'orders': "last_updates",
    'products': "last_updates"
}


class Woocommerce:
    def __init__(self, configuration):
        self.wcapi = None
        self.dataset_name = None
        self.WEBSITE_CONFIGS = configuration
        self.retry_limit = 5

    def _setup_site(self, site):
        if site not in WEBSITE_CONFIGS:
            raise ValueError(f"Invalid site: {site}")
        config = WEBSITE_CONFIGS[site]
        self.dataset_name = f"WooCommerce_{site}"

        session = requests.Session()
        retry_strategy = Retry(total=3, backoff_factor=1,
                               status_forcelist=[429, 500, 502, 503, 504])
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("https://", adapter)

        self.wcapi = API(
            url=config['url'],
            consumer_key=config['consumer_key'],
            consumer_secret=config['consumer_secret'],
            version="wc/v3",
            timeout=30,
            wp_api=True,
            requests_kwargs={'session': session}
        )

    def del_data(self, tables, site):
        self._setup_site(site)
        try:
            for name in tables:
                read_gbq(
                    f"UPDATE {YOUR_PROJECT_ID}.{self.dataset_name}.sync_dates "
                    f"SET time_update = '%Y-%m-%d' WHERE table_name = '{name}'"
                )
            logger.info(f"✅ Reset dates for {self.dataset_name}")
        except Exception as e:
            logger.error(f"❌ Error resetting dates: {e}")

    def woocommerce_extract(self, site):
        self._setup_site(site)
        logger.info(f"🚀 Start update: {self.dataset_name} - {datetime.utcnow().strftime('%Y-%m-%d')}")
        for table_name in table_config:
            self._extract_with_retry(table_name)

    def _extract_with_retry(self, table_name):
        attempts = 0
        while attempts < self.retry_limit:
            try:
                old_date = read_gbq(
                    f"SELECT * FROM {YOUR_PROJECT_ID}.{self.dataset_name}.sync_dates "
                    f"WHERE table_name = '{table_name}'"
                )['time_update'][0]

                mode = table_config[table_name]
                if old_date == '%Y-%m-%d' or mode == "full":
                    self.extract_full(table_name, old_date)
                else:
                    self.extract_last_updates(table_name, old_date)
                return
            except Exception as e:
                attempts += 1
                logger.warning(f"Attempt {attempts}/{self.retry_limit} failed for {table_name}: {e}")
        logger.error(f"🛑 Max retries reached for {table_name}")

    def extract_full(self, table_name, old_date=None):
        logger.info(f"📦 FULL extract: {table_name}")
        df = pd.DataFrame()
        url = f"{table_name}?page=1&per_page=100"
        response = self.wcapi.get(url)
        count = response.headers.get('x-wp-total', '0')
        pages = int(response.headers.get('x-wp-totalpages', '1'))

        for page in range(1, pages + 1):
            res = self.wcapi.get(f"{table_name}?page={page}&per_page=100")
            data = json.loads(res.text)
            chunk = pd.DataFrame(data)
            df = pd.concat([df, chunk], ignore_index=True)

        if "id" in df.columns:
            df = df.drop_duplicates(subset="id")

        if old_date and "date_modified" in df.columns:
            df = df[df["date_modified"] > old_date]

        if not df.empty:
            table_clean = self._sanitize_table_name(table_name)
            if old_date:
                ids = ', '.join([f"'{x}'" for x in df["id"]])
                read_gbq(f"DELETE FROM {YOUR_PROJECT_ID}.{self.dataset_name}.{table_clean} WHERE id IN ({ids})")
                time.sleep(2)
                load_data_to_GBQ(df, table_clean, self.dataset_name, 'append')
            else:
                load_data_to_GBQ(df, table_clean, self.dataset_name, 'replace')

            sync_date = df["date_modified"].max() if "date_modified" in df.columns else datetime.utcnow().strftime('%Y-%m-%d')
            read_gbq(
                f"UPDATE {YOUR_PROJECT_ID}.{self.dataset_name}.sync_dates "
                f"SET time_update = '{sync_date}' WHERE table_name = '{table_name}'"
            )
            logger.info(f"✅ Updated {table_name}: {len(df)} rows")

    def extract_last_updates(self, table_name, old_date):
        logger.info(f"📦 LAST UPDATES: {table_name} since {old_date}")
        df = pd.DataFrame()
        url = f"{table_name}?page=1&per_page=100&modified_after={old_date}"
        response = self.wcapi.get(url)
        count = response.headers.get('x-wp-total', '0')
        pages = int(response.headers.get('x-wp-totalpages', '1'))

        for page in range(1, pages + 1):
            res = self.wcapi.get(f"{table_name}?page={page}&per_page=100&modified_after={old_date}")
            data = json.loads(res.text)
            chunk = pd.DataFrame(data)
            df = pd.concat([df, chunk], ignore_index=True)

        if not df.empty:
            df = df.drop_duplicates(subset="id")
            table_clean = self._sanitize_table_name(table_name)
            ids = ', '.join([f"'{x}'" for x in df["id"]])
            read_gbq(f"DELETE FROM {YOUR_PROJECT_ID}.{self.dataset_name}.{table_clean} WHERE id IN ({ids})")
            time.sleep(2)
            load_data_to_GBQ(df, table_clean, self.dataset_name, 'append')

            sync_date = df["date_modified"].max()
            read_gbq(
                f"UPDATE {YOUR_PROJECT_ID}.{self.dataset_name}.sync_dates "
                f"SET time_update = '{sync_date}' WHERE table_name = '{table_name}'"
            )
            logger.info(f"✅ Updated {table_name}: {len(df)} rows")
        else:
            logger.info(f"🟡 No updates for {table_name}")

    def _sanitize_table_name(self, name):
        return name.replace('/', '_')
xentral_connectors.py

import requests
import pandas as pd
from requests.auth import HTTPDigestAuth

from api.bigquery_connectors import load_data_to_GBQ
from api.Telegram import Telegram

telegram = Telegram()

username = "{YOUR_USERNAME}"
password = "{YOUR_PASSWORD}"
base_url = "{YOUR_BASE_URL}"


def _generate_bq_schema(df, default_type="STRING"):
    from pandas_gbq import schema
    return schema.generate_bq_schema(df, default_type=default_type)


class Xentral:

    @staticmethod
    def table_name_validation(name):
        return name.translate({ord('/'): '_'})

    @staticmethod
    def Xentral_extract_to_load(response, name):
        table_name = Xentral.table_name_validation(name)
        df_ful = response.json()

        page_current = df_ful['pagination']['page_current']
        page_last = df_ful['pagination']['page_last']
        items_current = df_ful['pagination']['items_current']
        items_total = df_ful['pagination']['items_total']
        df_data = df_ful['data']
        df = pd.DataFrame(df_data)

        telegram.telegram_mss(
            f'{table_name} --page-- {page_current} из {page_last} | '
            f'items_current: {items_current} / items_total: {items_total}'
        )

        if page_current == 1 and page_last > 1:
            load_data_to_GBQ(df, table_name, 'Xentral', 'replace')
            return Xentral.Xentral_extract(name, page_current + 1)

        if page_current == 1 and page_last == 1:
            load_data_to_GBQ(df, table_name, 'Xentral', 'replace')
            telegram.telegram_mss(f'✅ {table_name} загрузка завершена ✅')
            return

        if 1 < page_current < page_last:
            load_data_to_GBQ(df, table_name, 'Xentral', 'append')
            return Xentral.Xentral_extract(name, page_current + 1)

        if page_current == page_last:
            load_data_to_GBQ(df, table_name, 'Xentral', 'append')
            telegram.telegram_mss(f'✅ {table_name} загрузка завершена ✅')
            return

    @staticmethod
    def Xentral_extract(name, page):
        url = f"{base_url}/v1/{name}?page={page}&items=1000"
        response = requests.get(url, auth=HTTPDigestAuth(username, password))

        if response.status_code != 200:
            telegram.telegram_mss(f'❗❗❗ {name} — ошибка {response.status_code} ❗❗❗')
            return

        return Xentral.Xentral_extract_to_load(response, name)
intercom_connectors.py

import requests
import pandas as pd
import json
import logging
from datetime import datetime
from requests.auth import HTTPBasicAuth

from api.bigquery_connectors import load_data_to_GBQ, read_gbq

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

token = "{YOUR_TOKEN}"
ID = "{YOUR_ID}"

headers = {
    "accept": "application/json",
    "Intercom-Version": "2.8"
}

WEBSITE_CONFIGS = {
    'FR': 'uweed.fr',
    'DE': 'uweed.de',
    'CH': 'uweed.ch'
}

table_config = {
    'admins': "admins",
    'admins/activity_logs': "activity_logs",
    'articles': "data",
    'help_center/collections': "data",
    'companies': "data",
    'companies/scroll': "data",
    'contacts': "data"
}


class Intercom:

    def zbroc_date_GBQ(self):
        tables = [{'table_name': name, 'time_update': "%Y-%m-%d"} for name in table_config]
        df = pd.DataFrame(tables)
        load_data_to_GBQ(df, 'sync_dates', 'Intercom', 'replace')

    def table_data(self, df):
        for col in df.columns:
            df[col] = [','.join(map(str, [val])) for val in df[col]]
        return df

    def table_name_validation(self, name):
        return name.replace('/', '_')

    def table_load(self, table_name, site_url=None):
        payload = {}
        if site_url:
            payload['custom_attributes'] = {'SOURCE_WEBSITE': site_url}

        try:
            count_row_gbq = read_gbq(
                f"SELECT COUNT(*) FROM gtm-kvzqx36-nmy5y.Intercom.{self.table_name_validation(table_name)}"
            )["f0_"][0]
        except:
            count_row_gbq = "None"

        df = pd.DataFrame()

        url = f"https://api.intercom.io/{table_name}"
        response = requests.get(url, headers=headers, data=payload, auth=HTTPBasicAuth(token, ID))

        data = response.json().get(table_config[table_name], [])
        df = pd.concat([df, self.table_data(pd.DataFrame(data))], ignore_index=True)

        next_page = response.json().get("pages", {}).get("next", {}).get("starting_after", None)
        while next_page:
            paginated_url = f"{url}?starting_after={next_page}"
            response = requests.get(paginated_url, headers=headers, data=payload, auth=HTTPBasicAuth(token, ID))
            data = response.json().get(table_config[table_name], [])
            df = pd.concat([df, self.table_data(pd.DataFrame(data))], ignore_index=True)
            next_page = response.json().get("pages", {}).get("next", {}).get("starting_after", None)

        if not df.empty:
            load_data_to_GBQ(df, self.table_name_validation(table_name), 'Intercom', 'replace')
            sync_date = datetime.utcnow().strftime('%Y-%m-%d')
            read_gbq(
                f"UPDATE gtm-kvzqx36-nmy5y.Intercom.sync_dates "
                f"SET time_update = '{sync_date}' WHERE table_name = '{table_name}'"
            )
            count_row_new = read_gbq(
                f"SELECT COUNT(*) FROM gtm-kvzqx36-nmy5y.Intercom.{self.table_name_validation(table_name)}"
            )["f0_"][0]

            print(f"TABLE {table_name.upper()} ✅\n"
                  f"Intercom rows: {len(df)}\n"
                  f"GBQ before: {count_row_gbq} → after: {count_row_new}\n"
                  f"Updated: {sync_date}")
        else:
            print(f"🛑 TABLE {table_name.upper()} — no data")

    def Intercom_extract(self, ENABLED_SITES):
        print(f"📍INTERCOM UPDATE START — {datetime.utcnow().strftime('%Y-%m-%d')}")
        for table_name in table_config:
            if table_name != 'contacts':
                self.table_load(table_name)

        for site in ENABLED_SITES:
            self.table_load('contacts', WEBSITE_CONFIGS[site])
analytics_connectors.py

import pandas as pd
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import (
    DateRange, Dimension, Metric, RunReportRequest
)
from google.oauth2.service_account import Credentials


class Analytics:
    def __init__(self):
        self.PROPERTY_ID = {YOUR_PROPERTY_ID}
        credentials = Credentials.from_service_account_info({YOU_KEY})
        self.client = BetaAnalyticsDataClient(credentials=credentials)

    def get_report(self, start_date: str, end_date: str, table_name: str) -> pd.DataFrame:
        table_config = {
            "analytics_page_path": {
                "dimensions": ["date", "pagePath"],
                "metrics": ["screenPageViews", "userEngagementDuration"]
            },
            "analytics_device": {
                "dimensions": ["date", "deviceCategory", "pagePath"],
                "metrics": ["screenPageViews"]
            },
            "analytics_new_returning": {
                "dimensions": ["date", "newVsReturning", "pagePath"],
                "metrics": ["screenPageViews"]
            },
            "analytics_location": {
                "dimensions": ["date", "city", "pagePath"],
                "metrics": ["screenPageViews"]
            },
            "analytics_gender": {
                "dimensions": ["date", "city", "pagePath"],
                "metrics": ["screenPageViews"]
            },
            "analytics_age": {
                "dimensions": ["date", "city", "pagePath"],
                "metrics": ["screenPageViews"]
            }
        }

        if table_name not in table_config:
            raise ValueError(f"Unknown table_name: {table_name}")

        config = table_config[table_name]
        dimensions = config["dimensions"]
        metrics = config["metrics"]

        request = RunReportRequest(
            property=f"properties/{self.PROPERTY_ID}",
            dimensions=[Dimension(name=d) for d in dimensions],
            metrics=[Metric(name=m) for m in metrics],
            date_ranges=[DateRange(start_date=start_date, end_date=end_date)]
        )

        response = self.client.run_report(request)
        data = []

        for row in response.rows:
            row_data = {dim: dim_val.value for dim, dim_val in zip(dimensions, row.dimension_values)}
            row_data.update({met: met_val.value for met, met_val in zip(metrics, row.metric_values)})
            data.append(row_data)

        if not data:
            return pd.DataFrame([])

        df = pd.DataFrame(data, columns=dimensions + metrics)
        df['date'] = pd.to_datetime(df['date'], format='%Y%m%d').dt.strftime('%Y-%m-%d')
        return df
facebook_connectors.py

import json
import requests
import pandas as pd
from facebook_business.api import FacebookAdsApi
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.adobjects.ad import Ad
from facebook_business.adobjects.campaign import Campaign



class Facebook:
    def __init__(self):
        self.access_token = {YOUR_TOKEN}
        self.ad_account_id = {YOUR_AD_ACCOUNT_ID}
        FacebookAdsApi.init(access_token=self.access_token)

    def _fetch_report(self, params, base_url):
        all_data = []
        next_url = base_url

        while next_url:
            response = requests.get(next_url, params=params if next_url == base_url else None)

            if response.status_code == 200:
                result = response.json()
                all_data.extend(result.get('data', []))
                next_url = result.get('paging', {}).get('next', None)
            else:
                print(f"Ошибка: {response.status_code} - {response.text}")
                break

        return pd.DataFrame(all_data)

    def fb_get_report(self, start_date, end_date, table_name):
        url = f"https://graph.facebook.com/v17.0/{self.ad_account_id}/insights"
        params = {
            'access_token': self.access_token,
            'fields': 'date_start,date_stop,campaign_id,adset_id,ad_id,reach,impressions,unique_clicks,clicks,inline_link_clicks,frequency,spend',
            'time_increment': 1,
            'level': 'ad',
            'time_range': json.dumps({'since': start_date, 'until': end_date}),
            'limit': 100
        }
        return self._fetch_report(params, url)

    def fb_get_report_day(self, start_date, end_date, table_name):
        url = f"https://graph.facebook.com/v17.0/{self.ad_account_id}/insights"
        params = {
            'access_token': self.access_token,
            'fields': 'date_start,date_stop,campaign_id,reach,impressions,unique_clicks,clicks,inline_link_clicks,frequency,spend',
            'time_increment': 1,
            'level': 'campaign',
            'time_range': json.dumps({'since': start_date, 'until': end_date}),
            'limit': 100
        }
        return self._fetch_report(params, url)

    def fb_get_report_all_days(self, start_date, end_date, table_name):
        url = f"https://graph.facebook.com/v17.0/{self.ad_account_id}/insights"
        params = {
            'access_token': self.access_token,
            'fields': 'date_start,date_stop,campaign_id,reach,impressions,unique_clicks,clicks,inline_link_clicks,frequency,spend',
            'time_increment': 'all_days',
            'level': 'campaign',
            'time_range': json.dumps({'since': start_date, 'until': end_date}),
            'limit': 100
        }
        return self._fetch_report(params, url)

    def fb_lead_get_report(self, start_date, end_date, table_name):
        leads = []
        ads_lead_list = []

        campaigns = AdAccount(self.ad_account_id).get_campaigns(
            fields=['id', 'name'], params={}
        )
        campaign_leads = [c for c in campaigns if 'Lead' in c['name']]

        for campaign in campaign_leads:
            insights = Campaign(campaign['id']).get_insights(
                fields=['ad_id', 'campaign_id', 'actions'],
                params={
                    'time_range': {'since': start_date, 'until': end_date},
                    'level': 'ad',
                    'filtering': [{'field': 'action_type', 'operator': 'CONTAIN', 'value': 'lead'}],
                }
            )
            ads_lead_list += [
                ad['ad_id'] for ad in insights
                if 'actions' in ad and any('lead' in a['action_type'] for a in ad['actions'])
            ]

        for ad_id in ads_lead_list:
            fields = [
                'campaign_id', 'adset_id', 'adset_name', 'ad_id', 'ad_name',
                'id', 'created_time', 'field_data'
            ]
            params = {'time_range': {'since': start_date, 'until': end_date}}
            leads += Ad(ad_id).get_leads(fields=fields, params=params)

        df = pd.DataFrame(leads)

        if not df.empty:
            df['email'] = df['field_data'].apply(lambda fields: next((f['values'][0] for f in fields if f['name'] == 'email'), None))
            df['phone_number'] = df['field_data'].apply(lambda fields: next((f['values'][0] for f in fields if f['name'] == 'phone_number'), None))
            df['full_name'] = df['field_data'].apply(lambda fields: next((f['values'][0] for f in fields if f['name'] in ['full_name', 'full name']), None))
            df['post_code'] = df['field_data'].apply(lambda fields: next((f['values'][0] for f in fields if f['name'] == 'post_code'), None))
        return df

    # Вспомогательный метод для тестов
    def fb_get_report_campaign(self, start_date, end_date, campaign_ids):
        url = f'https://graph.facebook.com/v17.0/{self.ad_account_id}/insights'
        params = {
            'access_token': self.access_token,
            'fields': 'reach,impressions,unique_clicks,frequency,spend,clicks,inline_link_clicks',
            'time_increment': 'all_days',
            'time_range': json.dumps({'since': start_date, 'until': end_date}),
            'filtering': json.dumps([{"field": "campaign.id", "operator": "IN", "value": campaign_ids}]),
            'limit': 100
        }
        df = self._fetch_report(params, url)
        return df
google_ads_connectors.py

import pandas as pd
from google.ads.googleads.client import GoogleAdsClient
from google.ads.googleads.errors import GoogleAdsException


class GoogleAds:
    def __init__(self):
        self.customer_id = {YOUR_CUSTOMER_ID}
        self.client = GoogleAdsClient.load_from_dict({YOUR_ADS_CONFIG}) 
        self.timeout = 30

    def fetch_geo_target_names(self, geo_target_ids):
        geo_target_dict = {}
        service = self.client.get_service("GoogleAdsService")

        formatted_ids = ', '.join([f"'geoTargetConstants/{_id}'" for _id in geo_target_ids])
        query = f"""
            SELECT
                geo_target_constant.resource_name,
                geo_target_constant.name
            FROM geo_target_constant
            WHERE geo_target_constant.resource_name IN ({formatted_ids})
        """
        try:
            response = service.search(customer_id=self.customer_id, query=query)
            for row in response:
                geo_id = row.geo_target_constant.resource_name.split('/')[-1]
                geo_target_dict[geo_id] = row.geo_target_constant.name
        except GoogleAdsException as ex:
            print(f"Geo Target fetch error: {ex}")
        return geo_target_dict

    def ads_get_report(self, start_date, end_date, table_name):
        service = self.client.get_service("GoogleAdsService")
        data = []

        try:
            if table_name == "ads_campaign_performance_data":
                query = f"""
                    SELECT
                        metrics.impressions, metrics.clicks,
                        segments.date, campaign.id
                    FROM campaign
                    WHERE segments.date BETWEEN '{start_date}' AND '{end_date}'
                """
                stream = service.search_stream(self.customer_id, query=query, timeout=self.timeout)
                for batch in stream:
                    for row in batch.results:
                        data.append({
                            "date": row.segments.date,
                            "campaign_id": row.campaign.id,
                            "clicks": row.metrics.clicks,
                            "impressions": row.metrics.impressions,
                        })
                return pd.DataFrame(data)

            if table_name == "ads_gender_data":
                gender_map = {10: 'male', 11: 'female', 20: 'unknown'}
                query = f"""
                    SELECT
                        ad_group_criterion.gender.type,
                        metrics.clicks,
                        segments.date,
                        campaign.id
                    FROM gender_view
                    WHERE segments.date BETWEEN '{start_date}' AND '{end_date}'
                """
                stream = service.search_stream(self.customer_id, query=query, timeout=self.timeout)
                for batch in stream:
                    for row in batch.results:
                        gender = gender_map.get(row.ad_group_criterion.gender.type_, "unknown")
                        data.append({
                            "date": row.segments.date,
                            "campaign_id": row.campaign.id,
                            "gender": gender,
                            "clicks": row.metrics.clicks,
                        })
                return pd.DataFrame(data)

            if table_name == "ads_age_data":
                age_map = {
                    503001: '18-24', 503002: '25-34', 503003: '35-44',
                    503004: '45-54', 503005: '55-64', 503006: '65+',
                    503999: 'unknown',
                }
                query = f"""
                    SELECT
                        ad_group_criterion.age_range.type,
                        metrics.clicks,
                        segments.date,
                        campaign.id
                    FROM age_range_view
                    WHERE segments.date BETWEEN '{start_date}' AND '{end_date}'
                """
                stream = service.search_stream(self.customer_id, query=query, timeout=self.timeout)
                for batch in stream:
                    for row in batch.results:
                        age = age_map.get(row.ad_group_criterion.age_range.type_, "unknown")
                        data.append({
                            "date": row.segments.date,
                            "campaign_id": row.campaign.id,
                            "age": age,
                            "clicks": row.metrics.clicks,
                        })
                return pd.DataFrame(data)

            if table_name == "ads_postal_code":
                query = f"""
                    SELECT
                        segments.geo_target_postal_code,
                        metrics.clicks,
                        segments.date,
                        campaign.id
                    FROM geographic_view
                    WHERE segments.date BETWEEN '{start_date}' AND '{end_date}'
                """
                stream = service.search_stream(self.customer_id, query=query, timeout=self.timeout)
                geo_target_ids = set()

                for batch in stream:
                    for row in batch.results:
                        geo_id = row.segments.geo_target_postal_code.split("/")[-1]
                        geo_target_ids.add(geo_id)
                        data.append({
                            "date": row.segments.date,
                            "campaign_id": row.campaign.id,
                            "postcode_id": geo_id,
                            "postal_code": "",
                            "clicks": row.metrics.clicks,
                        })

                geo_dict = self.fetch_geo_target_names(geo_target_ids)
                for item in data:
                    item["postal_code"] = geo_dict.get(item["postcode_id"], "Unknown")

                return pd.DataFrame(data)

        except GoogleAdsException as ex:
            print(f"Google Ads error: {ex}")

        return pd.DataFrame([])
matomo_connectors.py

import pandas as pd
import requests
from api.credentials.file import matomo_token, matomo_base_url

matomo_urls_1 = [
    {YOU_URLS}
]


class Matomo:
    def __init__(self):
        self.base_url = {YOUR_BASE_URL}
        self.token = {YOUR_TOKEN}

    def df_url(self, start_date: str, end_date: str, page_url: str) -> pd.DataFrame:
        params = {
            'module': 'API',
            'idSite': 1,
            'period': 'day',
            'date': f"{start_date},{end_date}",
            'format': 'json',
            'method': 'Actions.getPageUrl',
            'pageUrl': page_url,
            'token_auth': self.token
        }

        response = requests.post(self.base_url, data=params)

        if response.status_code == 200:
            data = response.json()
            rows = [
                {**record, 'date': date}
                for date, records in data.items()
                for record in records
            ]
            return pd.DataFrame(rows)
        else:
            return pd.DataFrame([])

    def matomo_url_get_report(self, start_date: str, end_date: str, table_name: str) -> pd.DataFrame:
        all_dfs = [self.df_url(start_date, end_date, url) for url in matomo_urls_1]
        all_dfs = [df for df in all_dfs if not df.empty]

        if all_dfs:
            return pd.concat(all_dfs, ignore_index=True)
        else:
            return pd.DataFrame([])

    def matomo_get_report(self, start_date: str, end_date: str, table_name: str) -> pd.DataFrame:
        params = {
            'module': 'API',
            'idSite': 1,
            'period': 'day',
            'date': f"{start_date},{end_date}",
            'format': 'json',
            'method': 'Actions.getPageUrls',
            'token_auth': self.token
        }

        response = requests.post(self.base_url, data=params)

        if response.status_code == 200:
            data = response.json()
            rows = [
                {**record, 'date': date}
                for date, records in data.items()
                for record in records
            ]
            return pd.DataFrame(rows)
        else:
            return pd.DataFrame([])
redshift_connectors.py

import io
import pandas as pd
import boto3
import redshift_connector
from botocore.config import Config


class Redshift:
    schema = 'public'

    def __init__(self):
        self.connection = self._connect()

    def _connect(self):
        return redshift_connector.connect(
            host='{YOUR_REDSHIFT_HOST}',
            database='dev',
            user='{YOUR_REDSHIFT_USER}',
            password='{YOUR_REDSHIFT_PASSWORD}',
            port=5439,
            timeout=300
        )

    def close(self):
        if self.connection:
            self.connection.close()

    def get_table_columns(self, table_name):
        query = """
            SELECT column_name, data_type
            FROM information_schema.columns
            WHERE table_name = %s AND table_schema = %s
            ORDER BY ordinal_position;
        """
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query, (table_name, self.schema))
                return cursor.fetchall()
        except Exception as e:
            print('get_table_columns ->', e)
            return None

    def get_table_data(self, table_name):
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(f"SELECT * FROM {table_name};")
                return cursor.fetchall()
        except Exception as e:
            print("get_table_data ->", e)
            return None

    def get_lead_id(self):
        try:
            query = 'SELECT id FROM "dev"."public"."facebook_ad_lead"'
            with self.connection.cursor() as cursor:
                cursor.execute(query)
                return cursor.fetchall()
        except Exception as e:
            print("get_lead_id ->", e)
            return None

    def get_last_data_sync(self, table_name, table_name_sync="data_sync_status"):
        query = f"""
            SELECT MAX(end_date)
            FROM "{table_name_sync}"
            WHERE company_name = '{table_name}' AND sync_status = 'True';
        """
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query)
                result = cursor.fetchone()
                return result[0] if result else None
        except Exception as e:
            print("get_last_data_sync ->", e)
            return None

    def create_table_schema_from_df(self, df: pd.DataFrame):
        schema_parts = []
        for col in df.columns:
            schema_parts.append(f"{col} VARCHAR(500)")  # можно улучшить типы позже
        return ", ".join(schema_parts)

    def create_table_redshift(self, table_name, schema):
        query = f"CREATE TABLE IF NOT EXISTS {table_name} ({schema});"
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query)
                self.connection.commit()
        except Exception as e:
            print("create_table_redshift ->", e)

    def append_data_with_s3(self, table_name, df: pd.DataFrame):
        bucket_name = "dot-etl"
        s3_key = f"temp-storage/{table_name}.csv"
        aws_access_key = "{YOUR_AWS_ACCESS_KEY}"
        aws_secret_key = "{YOUR_AWS_SECRET_KEY}"

        try:
            csv_buffer = io.StringIO()
            df.to_csv(csv_buffer, index=False, header=False)
            csv_buffer.seek(0)

            s3 = boto3.client(
                's3',
                aws_access_key_id=aws_access_key,
                aws_secret_access_key=aws_secret_key,
                config=Config(connect_timeout=300, read_timeout=300)
            )
            s3.put_object(Bucket=bucket_name, Key=s3_key, Body=csv_buffer.getvalue())

            copy_query = f"""
                COPY {self.schema}.{table_name}
                FROM 's3://{bucket_name}/{s3_key}'
                ACCESS_KEY_ID '{aws_access_key}'
                SECRET_ACCESS_KEY '{aws_secret_key}'
                CSV;
            """

            with self.connection.cursor() as cursor:
                cursor.execute(copy_query)
                self.connection.commit()

            return 'True', f'✅ Данные загружены в {table_name} через S3.'

        except Exception as e:
            self.connection.rollback()
            return 'False', f'❌ Ошибка загрузки: {str(e)}'