Job Search Engine


845 浏览 5 years, 1 month

2.16 用户触发数据爬虫定制 (3) - 触发记录的管理

版权声明: 转载请注明出处 http://www.codingsoho.com/

用户触发数据爬虫定制 (3) - 触发记录的管理

这一节处理ajax触发的后台更新

数据库的设计如下

job_entry是爬取下来工作的记录,每个触发的记录会有一条记录,它和job_entry是m2m的关系

trigger_scrap_record

  • user
  • href
  • keywords
  • last_updated_nums
  • created
  • updated

数据库

爬虫端

trigger_scrap_record_job_entry 和 job_entry 是m2m关系。对于django来说都封装好了,实现起来也比较简单。但是目前,我的操作是用的SQL原生语言,折腾了半天,太浪费时间,所以最后选用了sqlalchemy库

数据定义如下

from sqlalchemy import Table, Column, String, Text, DateTime, Integer, Boolean, ForeignKey, create_engine
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
import datetime

Base = declarative_base() # generate orm base class

# classic style
job_entry_m2m_trigger_scrap_record = Table(
    'job_entry_m2m_trigger_scrap_record',
    Base.metadata,
    Column('id', Integer, primary_key=True),
    Column('job_entry_id', Integer, ForeignKey('job_entry.id')),
    Column('trigger_scrap_record_id', Integer, ForeignKey('trigger_scrap_record.id')),)

# ORM style    
class JobEntry(Base):
    __tablename__='job_entry'    
    id = Column(Integer, nullable=False, primary_key=True, autoincrement=True)
    title = Column(String(200))
    salary = Column(String(45))
    region = Column(String(45))
    degree = Column(String(45))
    experience = Column(String(45))
    company = Column(String(145))
    industry = Column(String(145))
    description = Column(Text(300))
    href = Column(String(100))
    # created = Column(DateTime, default=datetime.datetime.utcnow)
    # updated = Column(DateTime, default=datetime.datetime.utcnow)
    created = Column(DateTime, default=datetime.datetime.now)
    updated = Column(DateTime, default=datetime.datetime.now)        
    trigger_scrap_records = relationship('TriggerScrapRecord',
            secondary = job_entry_m2m_trigger_scrap_record, 
            # backref = 'job_entries', 
            backref=('job_entries', {'lazy':'dynamic'}),
            # cascade="all, delete-orphan", # delete-orphan cascade is not supported on a many-to-many or many-to-one relationship when single_parent is not set.
            lazy='dynamic',
            passive_deletes=True)    
    def __repr__(self):
        return self.title

class TriggerScrapRecord(Base):
    __tablename__='trigger_scrap_record'
    id = Column(Integer, nullable=False, primary_key=True, autoincrement=True)
    keyword = Column(String(200))
    username = Column(String(45))
    email = Column(String(45))
    href = Column(Text(500))
    active = Column(Boolean, default=False)
    last_triggered = Column(DateTime, default=datetime.datetime.now)
    last_complete = Column(DateTime, default=datetime.datetime.now)
    def __repr__(self):
        return self.keyword

注意:relationship里的backref添加了lazy为dynamic backref=('job_entries', {'lazy':'dynamic'}),,否则报错

AttributeError: 'InstrumentedList' object has no attribute 'filter'
APP端

对应在django中的定义如下,

class TriggerScrapRecord(models.Model):
    keyword = models.CharField(_("keyword"),  max_length=200, null=False, blank=False)
    username = models.CharField(_("username"),  max_length=45,null=True, blank=False)
    email = models.CharField(_("mail"),  max_length=45,null=True, blank=False)
    href = models.TextField(_("href"),  max_length=500,null=True, blank=True)
    active = models.BooleanField(_("active"),  default=False)
    last_triggered = models.DateTimeField(_("last triggered")) 
    last_complete = models.DateTimeField(_("last completed")) 
    class Meta:
        db_table  = 'trigger_scrap_record'
    def __unicode__(self):
        return self.keyword

在JobEntry里添加m2m关系,开始的时候,我想简单一点,直接指定中间table名

    trigger_scrap_records = models.ManyToManyField(
        'TriggerScrapRecord', 
        db_table='job_entry_m2m_trigger_scrap_record', 
        related_name='job_entries',
        )

但是有以下报错

(1054, "Unknown column 'job_entry_m2m_trigger_scrap_record.triggerscraprecord_id' in 'where clause'")

原因应该是它会根据数据名自动指定关联表的字段名,而且这个没法配置,理论上我可以修改表的字段名以匹配,但是这样太过死板,最后我还是添加了这个中间对象

class JobEntryM2MTriggerScrapRecord(models.Model):
    job_entry = models.ForeignKey('JobEntry')
    trigger_scrap_record = models.ForeignKey('TriggerScrapRecord')

    class Meta:
        db_table  = 'job_entry_m2m_trigger_scrap_record'

并且修改关联字段如下

    trigger_scrap_records = models.ManyToManyField(
        'TriggerScrapRecord', 
        through=JobEntryM2MTriggerScrapRecord, 
        related_name='job_entries',
        )

后台配置

class TriggerScrapRecordAdmin(admin.ModelAdmin):
    list_display  = [
            'keyword',
            'username',
            'active',
            'last_triggered',
            'last_complete',
        ]
    search_fields   = [
            'keyword',
            'username',
        ]
    list_filter   = [
            'username',
            'active',
            'last_triggered',
            'last_complete',
        ]
    class Meta:
        model = TriggerScrapRecord
admin.site.register(TriggerScrapRecord, TriggerScrapRecordAdmin)

数据表的创建

数据表的创建在爬虫应用程序实现

class Singleton(type):
    def __init__(cls,name,bases,dic):
        super(Singleton,cls).__init__(name,bases,dic)
        cls.instance = None
    def __call__(cls,*args,**kwargs):
        if cls.instance is None:
            cls.instance = super(Singleton,cls).__call__(*args,**kwargs)
        return cls.instance

class SQLAlchemyHelper(object):
    __metaclass__ = Singleton        
    def __init__(self,host='[127.0.0.1](127.0.0.1)',user='root',password='123', db='bestjob', encoding="utf-8", echo=True, max_overflow=5):
        '''
        pymysql:
        mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
        MySQL-Python:
        mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
        '''
        self.engine = create_engine('mysql+mysqldb://{}:{}@{}:{}/{}'.format(user, password, host, 3306, db),connect_args={'charset':'utf8'}, encoding="utf-8", echo=True)
        Session = sessionmaker(bind=self.engine)
        self.session = Session()
        self.session.execute('show databases')        
        Base.metadata.create_all(self.engine) # create table
tips: python import 上级目录

有时候我们可能需要import另一个路径下的python文件,例如下面这个目录结构,我们想要在_train.py里import在networks目录下的_lstm.py和上级目录下的_config.py。

_config.py
networks
    _lstm.py
    _cnn.py
pipelines 
    _train.py

只需两步操作

(1)在networks文件夹下创建空的__init__.py文件

_config.py
networks
    __init__.py
    _lstm.py
    _cnn.py
pipelines 
    _train.py

(2)使用sys库添加路径

import sys
sys.path.append("..")
from networks._lstm import *
from _config import *
tips: 地址编码

同步命令

发送同步命令后,爬虫程序开始爬虫,并且返回命令反馈。爬虫的过程中不能连续发送,需等前一个命令结束

修改一下

jse.views.py

def build_liepiin_search_link(keywords_dict):
    base_url = "[https://www.liepin.com/zhaopin/](https://www.liepin.com/zhaopin/)"
    fields = [
        "isAnalysis",
        "dqs",
        "pubTime",
        "salary",
        "subIndustry",        
        "industryType",
        "compscale",
        "key",
        "init",
        "searchType",
        "headckid",
        # "flushckid", # MUST not set, otherwise alway show page 1 content
        "compkind",
        "fromSearchBtn",
        "sortFlag",
        "ckid",
        "jobKind",
        "industries",
        "clean_condition",
        "siTag",
        "d_sfrom",
        "d_ckId",        
        # "degradeFlag",                
        "d_pageSize",         
        "d_curPage", 
        "d_headId",         
    ]
    fields_selected = [
        "key",
        "industryType",
        "industries",
        "jobKind",
        "salary",
        "compscale",
    ]
    query_list = [field + "=" + keywords_dict.get(field, "") for field in fields_selected]
    url_query='&'.join(query_list)
    query_list = [field + "=" + keywords_dict.get(field, "") for field in fields] 
    url='&'.join(query_list)
    url = base_url + "?" + url
    # url += ("curPage" + "0")
    return url, url_query

几处修改

  1. 移除curPage,这个在爬虫翻页时添加
  2. 结合列表表达式和join函数来生成url,优化代码,并去除url最后的&符号
  3. 优化fields,添加了一下field,并移除flushckid,有这个变量在,不管怎么翻页,永远显示第一页内容。这个字段估计得长期维护,或者后面通过selenium控制页面来完成。

前端的修改比较少,修改一下传递的变量

        data : {
            href: $('#href_id').val(),
            keywords: $('#keywords_id').val(),
            action:"register_job_scrab",
            username:"{{request.user.username}}",
            email:$('input[name="mail"]').val()
        },

这几个添加变量将会给爬虫后台处理使用

  • username , keyword 用于identity唯一记录
  • email 用于定期爬虫时,向指定邮箱发送更新

大部分的工作会在爬虫后台完成

查询命令

前台不需要修改,这个会在爬虫后台完成,它根据查询的命令的记录,返回最后一次查询的时间

显示触发过的记录

jse.views.py

class JobScrapRegisterView(LoginRequiredMixin, FormView):
    form_class = JobScrapRegisterForm
    template_name = 'jse/job_scrap_register_form.html'
    def get_context_data(self, *args, **kwargs):
        context = super(JobScrapRegisterView,self).get_context_data(*args, **kwargs)
        context['related'] = TriggerScrapRecord.objects.filter(username=self.request.user.username)
        return context

job_scrap_register_form.html

    <h2>{% trans "Related Records:" %}</h2>
    <ul>
      {% for obj in related %}
      <!-- <li><a href="{% url 'job_entry_list' %}?{% filter urlencode %}keyword={{obj.keyword}} username={{obj.username}}{% endfilter %}">{{obj.username}} : {{obj.last_complete}}</a></li> -->
      <li><a href="{% url 'job_entry_list' %}?keyword={{obj.keyword|my_quote}}&username={{obj.username}}">{{obj.username}} : {{obj.last_complete}}</a> | <a href="{{obj.href}}">{% trans 'href' %}</a></li>
      {% endfor %}
    </ul>

注意: keyword里面本身包含了key=?这样的字样,如果直接放到地址栏会引起混乱,所以需要对它进行编码

没有去细究有没有现成的,我自己写了一个

# for dict
@register.filter(name='my_urlencode')
def my_urlencode(data):
    from urllib import urlencode
    return urlencode(data)

# for string
@register.filter(name='my_quote')
def my_quote(data):
    from urllib import quote
    return quote(data)

下面是后台处理程序, 显示指定查询的结果

views.py

class JobEntryListView(PageMixin, ListView):
    model = JobEntry
    filter_class = JobEntryFilter
    def get_context_data(self, *args, **kwargs):
        object_list= self.filter_class(self.request.GET, self.get_queryset()).qs
        if self.request.GET.get('keyword', None) and self.request.GET.get('username', None):
            keyword = self.request.GET['keyword']
            username = self.request.GET['username']
            from urllib import unquote, quote
            objs = TriggerScrapRecord.objects.filter(username=self.request.user.username)
            for _ in objs:
                if keyword.lower() == _.keyword.lower():
                    object_list = _.job_entries.all()
                    break
        kwargs.update({'object_list':object_list}) # without this, Paginator will get full object_list for handle
        context = super(JobEntryListView, self).get_context_data(*args, **kwargs)
        ...

filter_class处理完成之后,又再次过滤了一下,当然,也可以放到filter_class里完成

本来还想用unquote函数解析GET里的keyword变量,但是实际上keyword = self.request.GET['keyword']返回的已经是解析过的了。可以直接跟数据库比较

原始

key=%E8%8A%AF%E7%89%87&industryType=industry_01&industries=030&jobKind=1&salary=50$100&compscale=020 

编码

key%3D%25E8%258A%25AF%25E7%2589%2587%26industryType%3Dindustry_01%26industries%3D030%26jobKind%3D1%26salary%3D50%24100%26compscale%3D020

将触发页添加到导航条

顺便添加了admin到导航条

      <ul class="nav navbar-nav navbar-right">
        <li><a href="{% url 'job_scrap_register' %}">Scrap</a></li>
        <li><a href="{% url 'admin:index' %}">Admin</a></li>

爬虫后台概述

数据库

这个前面已经有了介绍

AJAX通信

本来是想用Flask实现的,无奈cors的问题,搞了半天,还是放弃继续用django,这个之前弄过

from comm.views import ajax
urlpatterns = [
    url(r'^$', ajax, name="ajax"),
]

首先它收到ajax命令后根据已知信息构建TriggerScrapRecord相关的数据结构,然后判断是那个指定,

  • 如果查询,那么查询表中是否指定关键字已存在,并且返回最近更新时间,否则返回无记录
  • 如果同步,需要判断是否该指定正在同步
    • 如果是,那么返回“稍等”
    • 如果不是,则返回“命令确认”,并启动新的线程去爬取数据
def ajax(request):
    response_data = {}
    response_data['result'] = 'failed'
    response_data['message'] = 'invalid input'    
    if request.method.lower() == "get":
        action = request.GET.get("action", None)
        p_data = {} # database
        p_data['keyword'] = request.GET.get("keywords", None)
        p_data['username'] = request.GET.get("username", None)
        p_data['href'] = request.GET.get("href", None)
        p_data['email'] = request.GET.get("email", None)
        p_data['active'] = 0
        p_data['last_triggered'] = datetime.datetime.now()
        p_data['last_complete'] = datetime.datetime.now()
        print(p_data)        
        if "query_scrab_status" == action:
            helper = SQLAlchemyHelper(host=sec_settings.DB_HOST,user=sec_settings.DB_USER,password=sec_settings.DB_PWD, db=sec_settings.DB_NAME)
            filter_dic = {'keyword': p_data.get('keyword', None), 'username': p_data.get('username', None)}
            objs = helper.session.query(TriggerScrapRecord).filter_by(**filter_dic)
            response_data['result'] = 'success'
            response_data['message'] = 'last sync @ {}'.format(objs.first().last_complete) if objs.count() else "No Records"
        elif "register_job_scrab" == action: 
            bs = JobBs(username=p_data['username'], keyword=p_data['keyword'], host=sec_settings.DB_HOST,user=sec_settings.DB_USER,password=sec_settings.DB_PWD, db=sec_settings.DB_NAME)
            filter_dic = {'keyword': p_data.get('keyword', None), 'username': p_data.get('username', None)}
            objs = bs.helper.session.query(TriggerScrapRecord).filter_by(**filter_dic)
            if objs.count() and objs.first().active:
                response_data['result'] = 'fail'
                response_data['message'] = 'scrap ongoing, wait a minute'
            else:
                response_data['result'] = 'success'
                response_data['message'] = 'registration confirmed'   
                t2 = threading.Thread(target=func_sync, args=(p_data.copy(),))
                t2.start()  
    return HttpResponse(json.dumps(response_data), content_type="application/json")

线程如下

  • 创建爬虫封装类 JobBs,参数除了数据库连接的相关参数,还包括usernamekeyword,这个是TriggerScrapRecord的unique key。
  • 查询该记录是否存在
    • 如果存在,判断是否爬虫正进行
      • 如果是,则返回,什么也不做
      • 否则,设置active=1
    • 不存在,则插入一条新纪录, active=1,将该记录保存到数据库
  • 执行爬虫
  • 更新active=0, last_complete=当前时间,保存数据库
def func_sync(*args):
    p_data = args[0]
    filter_dic = {'keyword': p_data.get('keyword', None), 'username': p_data.get('username', None)}
    bs = JobBs(username=p_data['username'], keyword=p_data['keyword'], host=sec_settings.DB_HOST,user=sec_settings.DB_USER,password=sec_settings.DB_PWD, db=sec_settings.DB_NAME)
    objs = bs.helper.session.query(TriggerScrapRecord).filter_by(**filter_dic)
    # New record or exist?
    if objs.count():
        # Query ongoing
        obj = bs.helper.session.query(TriggerScrapRecord).filter_by(**filter_dic).first()
        if obj.active:
            return
        # New query
        else:
            obj.active = 1        
    else:
        # Insert new
        p_data['active'] = 1
        obj = TriggerScrapRecord(**p_data)
        bs.helper.session.add(obj)
    bs.helper.session.commit()
    # do crawl ...................
    url_init = p_data['href']
    obj = bs.helper.session.query(TriggerScrapRecord).filter_by(**filter_dic).first()
    try:
        bs.crawler_data(url_init)
    except:        
        obj.active = 0
    # update db
    obj.active = 0
    obj.last_complete = datetime.datetime.now()
    bs.helper.session.add(obj)
    bs.helper.session.commit()

这一部分也尝试用 MySQLdb原生命令完成。

..........